Skip to content

geometry

Tasks for working with and optimizing geometry.

Classes:

Name Description
OptimizeAgBhGeometryExhaustive

Task to optimize the beam center and detector distance based on a powder pattern from an Ag Behenate run. Performs an exhaustive grid search.

OptimizeAgBhGeometryExhaustive

Bases: Task

Task to perform geometry optimization.

Source code in lute/tasks/geometry.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
class OptimizeAgBhGeometryExhaustive(Task):
    """Task to perform geometry optimization."""

    def __init__(self, *, params: TaskParameters, use_mpi: bool = True) -> None:
        super().__init__(params=params, use_mpi=use_mpi)
        hv.extension("bokeh")
        pn.extension()
        self._mpi_comm: MPI.Intracomm = MPI.COMM_WORLD
        self._mpi_rank: int = self._mpi_comm.Get_rank()
        self._mpi_size: int = self._mpi_comm.Get_size()

    def _check_if_path_and_type(self, string: str) -> Tuple[bool, Optional[str]]:
        """Check if a string is a valid path and determine the filetype.

        Args:
            string (str): The string that may be a file path.

        Returns:
            is_valid_path (bool): If it is a valid path.

            powder_type (Optional[str]): If is_valid_path, the file type.
        """
        is_valid_path: bool = False
        powder_type: Optional[str] = None
        if os.path.exists(string):
            is_valid_path = True
        else:
            return is_valid_path, powder_type
        try:
            with h5py.File(string):
                powder_type = "smd"
                is_valid_path = True

            return is_valid_path, powder_type
        except:
            ...

        try:
            np.load(string)
            powder_type = "numpy"
            is_valid_path = True
            return is_valid_path, powder_type
        except ValueError:
            ...

        return is_valid_path, powder_type

    def _extract_powder(self, powder_path: str) -> Optional[np.ndarray[np.float64]]:
        """Extract a powder image.

        May take a smalldata file or numpy array.

        Args:
            powder_path (str): Path to the object containing the powder image.

        Returns:
            powder (Optional[np.ndarray[np.float64]]): The extracted powder image.
                Returns None if no powder could be extracted and no specific error
                was encountered.
        """
        powder: Optional[np.ndarray[np.float64]] = None
        if isinstance(powder_path, str):
            is_valid: bool
            dtype: Optional[str]
            is_valid, dtype = self._check_if_path_and_type(powder_path)
            if is_valid and dtype == "numpy":
                powder = np.load(powder_path)
            elif is_valid and dtype == "smd":
                h5: h5py.File
                with h5py.File(powder_path) as h5:
                    unassembled: np.ndarray[np.float64] = h5[
                        f"Sums/{self._task_parameters.detname}_calib"
                    ][()]
                    if unassembled.shape == 2:
                        # E.g. Rayonix
                        powder = unassembled
                    else:
                        ix: np.ndarray[np.uint64] = h5[
                            f"UserDataCfg/{self._task_parameters.detname}/ix"
                        ][()]
                        iy: np.ndarray[np.uint64] = h5[
                            f"UserDataCfg/{self._task_parameters.detname}/iy"
                        ][()]

                        ix -= np.min(ix)
                        iy -= np.min(iy)

                        if (
                            unassembled.flatten().shape != ix.flatten().shape
                            or unassembled.flatten().shape != iy.flatten().shape
                        ):
                            raise RuntimeError(
                                "Shapes of detector image and pixel coordinates do not match!"
                            )

                        out_shape: Tuple[int, int] = (
                            int(np.max(ix) + 1),
                            int(np.max(iy) + 1),
                        )

                        powder = np.asarray(
                            sparse.coo_matrix(
                                (unassembled.flatten(), (ix.flatten(), iy.flatten())),
                                shape=out_shape,
                            ).todense()
                        )

        return powder

    def _extract_mask(
        self, mask_path: Optional[str]
    ) -> Optional[np.ndarray[np.float64]]:
        """Extract a mask.

        May take a smalldata file or numpy array.

        Args:
            mask_path (str): Path to the object containing the mask.

        Returns:
            mask (Optional[np.ndarray[np.float64]]): The extracted mask.
                Returns None if no powder could be extracted and no specific error
                was encountered.
        """
        mask: Optional[np.ndarray[bool]] = None
        is_valid: bool
        dtype: Optional[str]
        if isinstance(self._task_parameters.mask, str):
            is_valid, dtype = self._check_if_path_and_type(self._task_parameters.mask)
            if is_valid and dtype == "numpy":
                mask = np.load(self._task_parameters.mask)
            elif is_valid and dtype == "smd":
                h5: h5py.File
                with h5py.File(powder_path) as h5:
                    unassembled: np.ndarray[np.float64] = h5[
                        f"UserDataCfg/{self._task_parameters.detname}/mask"
                    ][()]
                    if unassembled.shape == 2:
                        # E.g. Rayonix
                        mask = unassembled
                    else:
                        ix: np.ndarray[np.uint64] = h5[
                            f"UserDataCfg/{self._task_parameters.detname}/ix"
                        ][()]
                        iy: np.ndarray[np.uint64] = h5[
                            f"UserDataCfg/{self._task_parameters.detname}/iy"
                        ][()]

                        ix -= np.min(ix)
                        iy -= np.min(iy)

                        if (
                            unassembled.flatten().shape != ix.flatten().shape
                            or unassembled.flatten().shape != iy.flatten().shape
                        ):
                            raise RuntimeError(
                                "Shapes of detector image and pixel coordinates do not match!"
                            )

                        out_shape: Tuple[int, int] = (
                            int(np.max(ix) + 1),
                            int(np.max(iy) + 1),
                        )

                        mask = np.asarray(
                            sparse.coo_matrix(
                                (unassembled.flatten(), (ix.flatten(), iy.flatten())),
                                shape=out_shape,
                            ).todense()
                        )
        return mask

    def _get_pixel_size_and_wavelength(
        self, ds: psana.DataSource, det: psana.Detector
    ) -> Tuple[float, float]:
        """Extract pixel size in mm and wavelength in Angstroms.

        Args:
            ds (psana.DataSource): psana DataSource object.

            det (psana.Detector): psana Detector object for which the pixel size
                will be extracted.

        Returns:
            pixel_size (float): Pixel size of det in mm.

            wavelength (float): X-ray wavelength in Angstroms.
        """
        pixel_size: float
        if self._task_parameters.detname.lower() == "rayonix":
            pixel_size = ds.env().configStore().get(psana.Rayonix.ConfigV2).pixelWidth()
        else:
            pixel_size = det.pixel_size(ds.env())
        pixel_size /= 1e3

        wavelength: float = ds.env().epicsStore().value("SIOC:SYS0:ML00:AO192") * 10
        return pixel_size, wavelength

    def _estimate_distance(self) -> float:
        """Estimate a starting distance.

        Will estimate the distance based on current geometry if no guess is
        provided, otherwise this method returns the provided guess.

        Returns:
            distance (float): Estimated distance, or user-provided guess if one
                was present in the TaskParameters.
        """
        if self._task_parameters.distance_guess is not None:
            return self._task_parameters.distance_guess
        exp: str = self._task_parameters.lute_config.experiment
        run: Union[int, str] = self._task_parameters.lute_config.run
        ds: psana.DataSource = psana.DataSource(f"exp={exp}:run={run}")
        det: psana.Detector = psana.Detector(self._task_parameters.detname, ds.env())
        return -1 * np.mean(det.coords_z(run)) / 1e3

    def _initial_image_center(
        self, powder: np.ndarray[np.float64]
    ) -> np.ndarray[np.float64]:
        """Estimate a beam center.

        Will estimate the center as the powder image center if no guess is
        provided, otherwise this method returns the provided guess.

        Args:
            powder (np.ndarray[np.float64]): Powder image.

        Returns:
            center (np.ndarray[np.float64]): Estimated center, or user-provided
                guess if one was present in the TaskParameters.
        """
        if self._task_parameters.center_guess is not None:
            return self._task_parameters.center_guess
        return np.array(powder.shape) / 2.0

    def _center_guesses(
        self, powder: np.ndarray[np.float64]
    ) -> List[Tuple[float, float]]:
        """Return starting beam center points based on dx/dy parameters.

        This method returns a list of starting values for the beam center to be
        used as initial guesses for the optimization routine. The output of this
        method is tuned by the dx/dy parameters provided in the TaskParameters
        model.

        Args:
            powder (np.ndarray[np.float64]): Powder image.

        Returns:
            guesses (List[Tuple[float,float]]): Starting guesses for the beam
                center optimization routine.
        """
        initial_center: np.ndarray[np.float64] = self._initial_image_center(powder)
        dx: Tuple[int, int, int] = self._task_parameters.dx
        dy: Tuple[int, int, int] = self._task_parameters.dy
        x_offsets: np.ndarray[np.float64] = np.linspace(dx[0], dx[1], dx[2])
        y_offsets: np.ndarray[np.float64] = np.linspace(dy[0], dy[1], dy[2])
        center_offsets: List[Tuple[float, float]] = list(
            itertools.product(x_offsets, y_offsets)
        )
        new_centers: List[Tuple[float, float]] = [
            (initial_center[0] + offset[0], initial_center[1] + offset[1])
            for offset in center_offsets
        ]
        return new_centers

    def _radial_profile(
        self,
        powder: np.ndarray[np.float64],
        mask: Optional[np.ndarray[np.float64]],
        center: Tuple[float, float],
        threshold: float = 10.0,
        filter_profile: bool = False,
        filter_order: int = 2,
        filter_threshold: float = 0.25,
    ) -> np.ndarray[np.float64]:
        """Compute the radial intensity profile of an image.

        Args:
            powder (np.ndarray[np.float64]): 2-D assembled powder image.

            mask (Optional[np.ndarray[np.float64]]): Corresponding binary mask
                for the powder image.

            center (Tuple[float, float]): Beam center in the image, in pixels.

            threshold (float): Default: 10. Below this intensity set the
                intensity of the radial profile to 0.

            filter_profile (bool): Default: False. If True apply a lowpass
                Butterworth filter to the profile.

            filter_order (int): Default: 2. If applying a filter, the order of
                the filter.

            filter_threshold (float): Default: 0.25. Critical frequency for the
                Butterworth filter, if applying it.

        Returns:
            radial_profile (np.ndarray[np.float64]): 1-D array of peak intensities
                for an azimuthally integrated powder image.
        """
        y: np.ndarray[np.int64]
        x: np.ndarray[np.int64]
        y, x = np.indices(powder.shape)
        radius_map: np.ndarray[np.float64] = (
            (x - center[0]) ** 2 + (y - center[1]) ** 2
        ) ** 0.5

        if mask is not None:
            radius_map = np.where(mask == 1, radius_map, 0)

        radius_map_int: np.ndarray[np.int64] = radius_map.astype(np.int64)
        tbin: np.ndarray[np.float64] = np.bincount(
            radius_map_int.ravel(), powder.ravel()
        )
        nr: np.ndarray[np.int64] = np.bincount(radius_map_int.ravel())
        radial_profile: np.ndarray[np.float64] = np.divide(
            tbin, nr, out=np.zeros(nr.shape[0]), where=nr != 0
        )
        if filter_profile:
            from scipy.signal import sosfiltfilt, butter

            sos = butter(filter_order, filter_threshold, output="sos")
            radial_profile = sosfiltfilt(sos, radial_profile)

        radial_profile[radial_profile < threshold] = 0
        return radial_profile

    def _calc_and_score_ideal_rings(
        self, q_peaks: np.ndarray[np.float64]
    ) -> Tuple[np.ndarray[np.float64], float]:
        """Score inter-peak distances in q-space based on known behenate pattern.

        Relies on the equidistance of peaks in q-space for silver behenate powder.

        Args:
            q_peaks (np.ndarray[np.float64]): Positions of powder peaks in q-space
                (inverse Angstrom). 1 dimensional.

        Returns:
            rings (np.ndarray[np.float64]): Predicted positions of peaks based on
                the best fit q-spacing. 1 dimensional.

            final_score (float): Final score calculated from residuals associated
                with each q-spacin.
        """
        # Q-spacings between behenate peaks
        delta_qs: np.ndarray[np.float64] = np.arange(0.01, 0.03, 0.00005)
        order_max: int = 13
        qround: np.ndarray[np.int64] = np.round(q_peaks, 5)
        scores: List[float] = []
        dq: float
        for dq in delta_qs:
            order: np.ndarray[np.float64] = qround / dq
            remainder: np.ndarray[np.float64] = np.minimum(
                qround % dq, np.abs(dq - (qround % dq))
            )
            score: np.ndarray[np.float64] = (
                np.mean(remainder[np.where(order < order_max)]) / dq
            )  # %mod helps prevent half periods from scoring well
            scores.append(score)
        deltaq_current = delta_qs[np.argmin(scores)]
        rings: np.ndarray[np.float64] = np.arange(
            deltaq_current, deltaq_current * (order_max + 1), deltaq_current
        )

        final_score: float = (np.mean(scores) - np.min(scores)) / np.std(scores)
        if np.isnan(np.min(scores)):
            final_score = 0.0
        return rings, final_score

    def _opt_distance(
        self,
        radial_profile: np.ndarray[np.float64],
        distance_guess: float,
        wavelength: float,
        pixel_size: float,
    ) -> Tuple[np.ndarray[np.int64], np.ndarray[np.float64], float, float]:
        """Optimize the detector distance.

        Args:
            radial_profile (np.ndarray[np.float64]): 1-D array of peak intensities
                for an azimuthally integrated powder image.

            distance_guess (float): Starting guess for the detector distance.

            wavelength (float): X-ray wavelength in angstroms.

            pixel_size (float): Size of detector pixels in mm.

        Returns:
            peak_indices (np.ndarray[np.int64]): 1-D array of peak indices.

            selected_peaks (np.ndarray[np.float64]): Array of selected peaks.

            new_distance (float): New, optimized, detector distance.

            final_score (float): Final score calculated from residuals associated
                with each q-spacing.
        """
        peak_indices: np.ndarray[np.int64]
        peak_indices, _ = find_peaks(radial_profile, prominence=1, distance=10)
        theta: np.ndarray[np.float64] = np.arctan(
            np.arange(radial_profile.shape[0]) * pixel_size / distance_guess
        )
        q_profile: np.ndarray[np.float64] = 2.0 * np.sin(theta / 2.0) / wavelength

        rings: np.ndarray[np.float64]
        final_score: float
        rings, final_score = self._calc_and_score_ideal_rings(q_profile[peak_indices])
        peaks_predicted: np.ndarray[np.float64] = (
            2 * distance_guess * np.arcsin(rings * wavelength / 2.0) / pixel_size
        )

        # FOR BEHENATE ONLY!!! Need other q0s
        q0: float = 0.1076
        new_distance: float = (
            peaks_predicted[0]
            * pixel_size
            / np.tan(2.0 * np.arcsin(wavelength * (q0 / (2 * np.pi)) / 2.0))
        )
        logger.info(
            f"Detector distance inferred from powder rings: {np.round(new_distance,2)}"
        )
        return peak_indices, radial_profile[peak_indices], new_distance, final_score

    def _opt_center(
        self,
        powder: np.ndarray[np.float64],
        indices: np.ndarray[np.int64],
        center_guess: Tuple[float, float],
    ) -> lmfit.minimizer.MinimizerResult:
        """Optimize the beam center position on the detector.

        Args:
            powder (np.ndarray[np.float64]): 2-D assembled powder image.

            indices (np.ndarray[np.float64]): Indices of peaks in a radial profile
                of the azimuthally integrated powder image.

            center_guess (Tuple[float, float]): Starting guess for the beam center
                in pixels.

        Returns:
            res (lmfit.minimizer.MinimizerResult): Result from the minimization.
                res.params["cx"] and res.params["cy"] contain the new beam center.
        """
        # Perform fitting - mean center and normalize image first
        powder_norm: np.ndarray[np.float64] = (powder - np.mean(powder)) / np.std(
            powder
        )
        params: lmfit.Parameters = lmfit.Parameters()
        params.add("cx", value=center_guess[0])
        params.add("cy", value=center_guess[1])
        for i in range(len(indices)):
            params.add(f"r{i:d}", value=indices[i])
        res: lmfit.minimizer.MinimizerResult = lmfit.minimize(
            geometry_optimize_residual,
            params,
            method="leastsq",
            nan_policy="omit",
            args=(powder_norm,),
        )
        try:
            lmfit.report_fit(res)
        except TypeError as err:
            # This shouldn't happen but don't fail if it does
            logger.error(f"Unable to report fit! {err}")
        return res

    def _opt_geom(
        self,
        powder: np.ndarray[np.float64],
        mask: np.ndarray[np.uint64],
        params_guess: Tuple[int, Tuple[float, float], float],
        n_iterations: int,
        wavelength: float,
        pixel_size: float,
    ) -> Tuple[List[float], List[float], List[Tuple[float, float]]]:
        """Optimize the detector distance and beam center.

        Args:
            powder (np.ndarray[np.float64]): 2-D assembled powder image.

            mask (np.ndarray[np.float64]): Corresponding binary mask for the
                powder image.

            params_guess (Tuple[int, Tuple[float, float], float]): Initial guesses.
                In format: (n_peaks, (center_x, center_y), distance).

            n_iterations (int): Number of iterations to perform.

            wavelength (float): X-ray wavelength in angstroms.

            pixel_size (float): Size of detector pixels in mm.

        Returns:
            peak_indices (np.ndarray[np.int64]): 1-D array of peak indices.

            selected_peaks (np.ndarray[np.float64]): Array of selected peaks.

            final_scores (float): Final scores calculated from residuals associated
                with each q-spacing for each iteration of optimization.

            calc_distances (List[float]): Optimized distances associated with each
                score.

            calc_centers (List[Tuple[float, float]]): Optimized centers associated
                with each score.
        """
        n_peaks: int = self._task_parameters.n_peaks
        radial_profile: np.ndarray[np.float64] = self._radial_profile(
            powder, mask, params_guess[1]
        )
        indices: np.ndarray[np.int64]
        peaks: np.ndarray[np.float64]
        distance: float
        final_score: float
        indices, peaks, distance, final_score = self._opt_distance(
            radial_profile, params_guess[2], wavelength, pixel_size
        )

        final_scores: List[float] = [final_score]
        calc_distances: List[float] = [distance]
        calc_centers: List[Tuple[float, float]] = [params_guess[1]]

        center_guess: Tuple[float, float] = params_guess[1]
        for iter in range(n_iterations):
            # Select the highest intensity peaks from the first 8 in q
            selected_indices: np.ndarray[np.int64] = indices[
                np.argsort(peaks[:8])[::-1][:n_peaks]
            ]
            res: lmfit.minimizer.MinimizerResult = self._opt_center(
                powder, selected_indices, center_guess
            )
            center_guess = (res.params["cx"].value, res.params["cy"].value)
            logger.info(f"New center is: ({center_guess[0]}, {center_guess[1]})")
            radial_profile = self._radial_profile(powder, mask, center_guess)
            indices, peaks, distance, final_score = self._opt_distance(
                radial_profile, distance, wavelength, pixel_size
            )
            final_scores.append(final_score)
            calc_distances.append(distance)
            calc_centers.append(center_guess)
        return final_scores, calc_distances, calc_centers

    def _run(self) -> None:
        """Perform geometry optimization of detector distance and beam center.

        Requires a powder image from data acquired of Ag Behenate.
        """
        exp: str = self._task_parameters.lute_config.experiment
        run: Union[int, str] = self._task_parameters.lute_config.run
        ds: psana.DataSource = psana.DataSource(f"exp={exp}:run={run}")
        det: psana.Detector = psana.Detector(self._task_parameters.detname, ds.env())
        pixel_size_mm: float
        wavelength_angs: float
        pixel_size_mm, wavelength_angs = self._get_pixel_size_and_wavelength(ds, det)

        powder: np.ndarray[np.float64] = self._extract_powder(
            self._task_parameters.powder
        )

        mask: Optional[np.ndarray[np.float64]] = self._extract_mask(
            self._task_parameters.mask
        )
        powder[powder > self._task_parameters.threshold] = 0

        starting_centers: List[Tuple[float, float]] = self._center_guesses(powder)
        starting_distance: float = self._estimate_distance()
        starting_scan_params: List[Tuple[int, Tuple[float, float], float]] = list(
            itertools.product(
                (self._task_parameters.n_peaks,),
                starting_centers,
                (starting_distance,),
            )
        )
        if self._mpi_rank == 0:
            quotient: int
            remainder: int
            quotient, remainder = divmod(len(starting_scan_params), self._mpi_size)
            parameters_per_rank = np.array(
                [
                    quotient + 1 if rank < remainder else quotient
                    for rank in range(self._mpi_size)
                ]
            )
            start_indices_per_rank: np.ndarray[np.int64] = np.zeros(
                self._mpi_size, dtype=np.int64
            )
            start_indices_per_rank[1:] = np.cumsum(parameters_per_rank[:-1])
        else:
            parameters_per_rank = np.zeros(self._mpi_size, dtype=np.int64)
            start_indices_per_rank = np.zeros(self._mpi_size, dtype=np.int64)

        self._mpi_comm.Bcast(parameters_per_rank, root=0)
        self._mpi_comm.Bcast(start_indices_per_rank, root=0)

        # Basic RANK-LOCAL variables - Each rank uses a subset of params
        ################################################################
        n_params: int = parameters_per_rank[self._mpi_rank]
        start_idx: int = start_indices_per_rank[self._mpi_rank]
        stop_idx: int = start_idx + n_params

        final_scores_by_rank: List[float] = []
        final_distances_by_rank: List[float] = []
        final_centers_by_rank: List[Tuple[float, float]] = []
        for params in starting_scan_params[start_idx:stop_idx]:
            scores: List[float]
            distances: List[float]
            centers: List[Tuple[float, float]]
            scores, distances, centers = self._opt_geom(
                powder,
                mask,
                params,
                self._task_parameters.n_iterations,
                wavelength_angs,
                pixel_size_mm,
            )
            final_scores_by_rank.extend(scores)
            final_distances_by_rank.extend(distances)
            final_centers_by_rank.extend(centers)

        self._mpi_comm.Barrier()

        # Gather all results
        final_scores: Union[List[float], List[List[float]]] = self._mpi_comm.gather(
            final_scores_by_rank, root=0
        )
        final_distances: Union[List[float], List[List[float]]] = self._mpi_comm.gather(
            final_distances_by_rank, root=0
        )
        final_centers: Union[
            List[Tuple[float, float], List[List[Tuple[float, float]]]]
        ] = self._mpi_comm.gather(final_centers_by_rank, root=0)

        # Flatten nested lists
        def flatten(nested_lists: List[List[Any]]) -> List[Any]:
            flattened: List[Any] = []
            for item in nested_lists:
                flattened.extend(item)
            return flattened

        if self._mpi_rank == 0:
            final_scores = flatten(final_scores)
            final_distances = flatten(final_distances)
            final_centers = flatten(final_centers)
            best_idx: int = np.argmax(final_scores)
            best_distance: float = final_distances[best_idx]
            best_center: Tuple[float, float] = final_centers[best_idx]
            # Calculate resolution at edge
            theta: np.ndarray[np.float64] = np.arctan(
                np.array((powder.shape[0] / 2)) * pixel_size_mm / best_distance
            )
            q: np.ndarray[np.float64] = 2.0 * np.sin(theta / 2.0) / wavelength_angs
            edge_resolution: float = 1.0 / q

            self._result.summary = []
            self._result.summary.append(
                {
                    "Detector distance (mm)": best_distance,
                    "Detector center (pixels)": best_center,
                    "Detector edge resolution (A)": edge_resolution,
                }
            )

            run: int = int(self._task_parameters.lute_config.run)

            plots: pn.Tabs = self.plot_powder_and_summaries(
                powder, best_center, best_distance, wavelength_angs, pixel_size_mm, mask
            )

            self._result.summary.append(
                ElogSummaryPlots(f"Geometry_Fit/r{run:04d}", plots)
            )

            deploy_geometry(
                out_dir=self._task_parameters.geom_out_dir,
                exp=self._task_parameters.lute_config.experiment,
                run=run,
                ds=ds,
                det=det,
                pixel_size_mm=pixel_size_mm,
                center=best_center,
                distance=best_distance,
            )

    def _post_run(self) -> None:
        super()._post_run()
        self._result.task_status = TaskStatus.COMPLETED

    def plot_powder_and_summaries(
        self,
        powder: np.ndarray[np.float64],
        center: Tuple[float, float],
        distance: float,
        wavelength: float,
        pixel_size: float,
        mask: Optional[np.ndarray[np.float64]] = None,
    ) -> pn.Tabs:
        radial_profile: np.ndarray[np.float64] = self._radial_profile(
            powder, mask, center
        )
        peak_indices, _ = find_peaks(radial_profile, prominence=1, distance=10)
        theta: np.ndarray[np.float64] = np.arctan(
            np.arange(radial_profile.shape[0]) * pixel_size / distance
        )
        q_profile: np.ndarray[np.float64] = 2.0 * np.sin(theta / 2.0) / wavelength

        rings: np.ndarray[np.float64]
        final_score: float
        rings, final_score = self._calc_and_score_ideal_rings(q_profile[peak_indices])
        peaks_predicted: np.ndarray[np.float64] = (
            2 * distance * np.arcsin(rings * wavelength / 2.0) / pixel_size
        )
        powder_grid: pn.GridSpec = self._plot_powder_and_rings(
            powder, center, peak_indices, peaks_predicted
        )
        profile_grid: pn.GridSpec = self._plot_radial_profile(
            radial_profile, q_profile, peak_indices
        )

        tabs: pn.Tabs = pn.Tabs(powder_grid)
        tabs.append(profile_grid)
        return tabs

    def _plot_radial_profile(
        self,
        radial_profile: np.ndarray[np.float64],
        qprofile: np.ndarray[np.float64],
        peaks_observed: np.ndarray[np.int64],
    ) -> pn.GridSpec:
        xdim: hv.core.dimension.Dimension = hv.Dimension(("Q", "Q"))
        ydim: hv.core.dimension.Dimesnion = hv.Dimension(("I", "I"))
        profiles: hv.Overlay
        profiles = hv.Curve((qprofile, radial_profile)).opts(
            line_width=1, axiswise=True
        )
        profiles *= hv.Points(
            (qprofile[peaks_observed], radial_profile[peaks_observed]),
            kdims=[xdim, ydim],
        ).opts(color="red", size=2)
        grid: pn.GridSpec = pn.GridSpec(name="Radial Profile")
        grid[:2, :2] = profiles

        return grid

    def _plot_powder_and_rings(
        self,
        powder: np.ndarray[np.float64],
        center: Tuple[float, float],
        peaks_observed: np.ndarray[np.float64],
        peaks_predicted: np.ndarray[np.float64],
    ) -> pn.GridSpec:
        dim: hv.core.dimension.Dimension = hv.Dimension(
            ("image", "Powder"),
            range=(
                np.nanpercentile(powder, 1),
                np.nanpercentile(powder, 99),
            ),
        )
        # (left, bottom, right, top)
        bounds: Tuple[int, int, int, int] = (
            -0.5,
            -0.5,
            powder.shape[0] - 0.5,
            powder.shape[1] - 0.5,
        )
        img: hv.Image = hv.Image(
            powder, bounds=bounds, vdims=[dim], name=dim.label
        ).options(colorbar=True, cmap="viridis")

        for peak in peaks_observed:
            img *= hv.Ellipse(center[0], center[1], peak).opts(
                line_dash="dashed", color="red", line_width=1
            )

        for peak in peaks_predicted:
            img *= hv.Ellipse(center[0], center[1], peak).opts(
                line_dash="dashed", color="black", line_width=1
            )

        grid: pn.GridSpec = pn.GridSpec(
            name=f"{self._task_parameters.detname} Powder and Observed Rings",
        )

        grid[0, 0] = pn.Row(img)
        return grid

_calc_and_score_ideal_rings(q_peaks)

Score inter-peak distances in q-space based on known behenate pattern.

Relies on the equidistance of peaks in q-space for silver behenate powder.

Parameters:

Name Type Description Default
q_peaks ndarray[float64]

Positions of powder peaks in q-space (inverse Angstrom). 1 dimensional.

required

Returns:

Name Type Description
rings ndarray[float64]

Predicted positions of peaks based on the best fit q-spacing. 1 dimensional.

final_score float

Final score calculated from residuals associated with each q-spacin.

Source code in lute/tasks/geometry.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def _calc_and_score_ideal_rings(
    self, q_peaks: np.ndarray[np.float64]
) -> Tuple[np.ndarray[np.float64], float]:
    """Score inter-peak distances in q-space based on known behenate pattern.

    Relies on the equidistance of peaks in q-space for silver behenate powder.

    Args:
        q_peaks (np.ndarray[np.float64]): Positions of powder peaks in q-space
            (inverse Angstrom). 1 dimensional.

    Returns:
        rings (np.ndarray[np.float64]): Predicted positions of peaks based on
            the best fit q-spacing. 1 dimensional.

        final_score (float): Final score calculated from residuals associated
            with each q-spacin.
    """
    # Q-spacings between behenate peaks
    delta_qs: np.ndarray[np.float64] = np.arange(0.01, 0.03, 0.00005)
    order_max: int = 13
    qround: np.ndarray[np.int64] = np.round(q_peaks, 5)
    scores: List[float] = []
    dq: float
    for dq in delta_qs:
        order: np.ndarray[np.float64] = qround / dq
        remainder: np.ndarray[np.float64] = np.minimum(
            qround % dq, np.abs(dq - (qround % dq))
        )
        score: np.ndarray[np.float64] = (
            np.mean(remainder[np.where(order < order_max)]) / dq
        )  # %mod helps prevent half periods from scoring well
        scores.append(score)
    deltaq_current = delta_qs[np.argmin(scores)]
    rings: np.ndarray[np.float64] = np.arange(
        deltaq_current, deltaq_current * (order_max + 1), deltaq_current
    )

    final_score: float = (np.mean(scores) - np.min(scores)) / np.std(scores)
    if np.isnan(np.min(scores)):
        final_score = 0.0
    return rings, final_score

_center_guesses(powder)

Return starting beam center points based on dx/dy parameters.

This method returns a list of starting values for the beam center to be used as initial guesses for the optimization routine. The output of this method is tuned by the dx/dy parameters provided in the TaskParameters model.

Parameters:

Name Type Description Default
powder ndarray[float64]

Powder image.

required

Returns:

Name Type Description
guesses List[Tuple[float, float]]

Starting guesses for the beam center optimization routine.

Source code in lute/tasks/geometry.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def _center_guesses(
    self, powder: np.ndarray[np.float64]
) -> List[Tuple[float, float]]:
    """Return starting beam center points based on dx/dy parameters.

    This method returns a list of starting values for the beam center to be
    used as initial guesses for the optimization routine. The output of this
    method is tuned by the dx/dy parameters provided in the TaskParameters
    model.

    Args:
        powder (np.ndarray[np.float64]): Powder image.

    Returns:
        guesses (List[Tuple[float,float]]): Starting guesses for the beam
            center optimization routine.
    """
    initial_center: np.ndarray[np.float64] = self._initial_image_center(powder)
    dx: Tuple[int, int, int] = self._task_parameters.dx
    dy: Tuple[int, int, int] = self._task_parameters.dy
    x_offsets: np.ndarray[np.float64] = np.linspace(dx[0], dx[1], dx[2])
    y_offsets: np.ndarray[np.float64] = np.linspace(dy[0], dy[1], dy[2])
    center_offsets: List[Tuple[float, float]] = list(
        itertools.product(x_offsets, y_offsets)
    )
    new_centers: List[Tuple[float, float]] = [
        (initial_center[0] + offset[0], initial_center[1] + offset[1])
        for offset in center_offsets
    ]
    return new_centers

_check_if_path_and_type(string)

Check if a string is a valid path and determine the filetype.

Parameters:

Name Type Description Default
string str

The string that may be a file path.

required

Returns:

Name Type Description
is_valid_path bool

If it is a valid path.

powder_type Optional[str]

If is_valid_path, the file type.

Source code in lute/tasks/geometry.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def _check_if_path_and_type(self, string: str) -> Tuple[bool, Optional[str]]:
    """Check if a string is a valid path and determine the filetype.

    Args:
        string (str): The string that may be a file path.

    Returns:
        is_valid_path (bool): If it is a valid path.

        powder_type (Optional[str]): If is_valid_path, the file type.
    """
    is_valid_path: bool = False
    powder_type: Optional[str] = None
    if os.path.exists(string):
        is_valid_path = True
    else:
        return is_valid_path, powder_type
    try:
        with h5py.File(string):
            powder_type = "smd"
            is_valid_path = True

        return is_valid_path, powder_type
    except:
        ...

    try:
        np.load(string)
        powder_type = "numpy"
        is_valid_path = True
        return is_valid_path, powder_type
    except ValueError:
        ...

    return is_valid_path, powder_type

_estimate_distance()

Estimate a starting distance.

Will estimate the distance based on current geometry if no guess is provided, otherwise this method returns the provided guess.

Returns:

Name Type Description
distance float

Estimated distance, or user-provided guess if one was present in the TaskParameters.

Source code in lute/tasks/geometry.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def _estimate_distance(self) -> float:
    """Estimate a starting distance.

    Will estimate the distance based on current geometry if no guess is
    provided, otherwise this method returns the provided guess.

    Returns:
        distance (float): Estimated distance, or user-provided guess if one
            was present in the TaskParameters.
    """
    if self._task_parameters.distance_guess is not None:
        return self._task_parameters.distance_guess
    exp: str = self._task_parameters.lute_config.experiment
    run: Union[int, str] = self._task_parameters.lute_config.run
    ds: psana.DataSource = psana.DataSource(f"exp={exp}:run={run}")
    det: psana.Detector = psana.Detector(self._task_parameters.detname, ds.env())
    return -1 * np.mean(det.coords_z(run)) / 1e3

_extract_mask(mask_path)

Extract a mask.

May take a smalldata file or numpy array.

Parameters:

Name Type Description Default
mask_path str

Path to the object containing the mask.

required

Returns:

Name Type Description
mask Optional[ndarray[float64]]

The extracted mask. Returns None if no powder could be extracted and no specific error was encountered.

Source code in lute/tasks/geometry.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def _extract_mask(
    self, mask_path: Optional[str]
) -> Optional[np.ndarray[np.float64]]:
    """Extract a mask.

    May take a smalldata file or numpy array.

    Args:
        mask_path (str): Path to the object containing the mask.

    Returns:
        mask (Optional[np.ndarray[np.float64]]): The extracted mask.
            Returns None if no powder could be extracted and no specific error
            was encountered.
    """
    mask: Optional[np.ndarray[bool]] = None
    is_valid: bool
    dtype: Optional[str]
    if isinstance(self._task_parameters.mask, str):
        is_valid, dtype = self._check_if_path_and_type(self._task_parameters.mask)
        if is_valid and dtype == "numpy":
            mask = np.load(self._task_parameters.mask)
        elif is_valid and dtype == "smd":
            h5: h5py.File
            with h5py.File(powder_path) as h5:
                unassembled: np.ndarray[np.float64] = h5[
                    f"UserDataCfg/{self._task_parameters.detname}/mask"
                ][()]
                if unassembled.shape == 2:
                    # E.g. Rayonix
                    mask = unassembled
                else:
                    ix: np.ndarray[np.uint64] = h5[
                        f"UserDataCfg/{self._task_parameters.detname}/ix"
                    ][()]
                    iy: np.ndarray[np.uint64] = h5[
                        f"UserDataCfg/{self._task_parameters.detname}/iy"
                    ][()]

                    ix -= np.min(ix)
                    iy -= np.min(iy)

                    if (
                        unassembled.flatten().shape != ix.flatten().shape
                        or unassembled.flatten().shape != iy.flatten().shape
                    ):
                        raise RuntimeError(
                            "Shapes of detector image and pixel coordinates do not match!"
                        )

                    out_shape: Tuple[int, int] = (
                        int(np.max(ix) + 1),
                        int(np.max(iy) + 1),
                    )

                    mask = np.asarray(
                        sparse.coo_matrix(
                            (unassembled.flatten(), (ix.flatten(), iy.flatten())),
                            shape=out_shape,
                        ).todense()
                    )
    return mask

_extract_powder(powder_path)

Extract a powder image.

May take a smalldata file or numpy array.

Parameters:

Name Type Description Default
powder_path str

Path to the object containing the powder image.

required

Returns:

Name Type Description
powder Optional[ndarray[float64]]

The extracted powder image. Returns None if no powder could be extracted and no specific error was encountered.

Source code in lute/tasks/geometry.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def _extract_powder(self, powder_path: str) -> Optional[np.ndarray[np.float64]]:
    """Extract a powder image.

    May take a smalldata file or numpy array.

    Args:
        powder_path (str): Path to the object containing the powder image.

    Returns:
        powder (Optional[np.ndarray[np.float64]]): The extracted powder image.
            Returns None if no powder could be extracted and no specific error
            was encountered.
    """
    powder: Optional[np.ndarray[np.float64]] = None
    if isinstance(powder_path, str):
        is_valid: bool
        dtype: Optional[str]
        is_valid, dtype = self._check_if_path_and_type(powder_path)
        if is_valid and dtype == "numpy":
            powder = np.load(powder_path)
        elif is_valid and dtype == "smd":
            h5: h5py.File
            with h5py.File(powder_path) as h5:
                unassembled: np.ndarray[np.float64] = h5[
                    f"Sums/{self._task_parameters.detname}_calib"
                ][()]
                if unassembled.shape == 2:
                    # E.g. Rayonix
                    powder = unassembled
                else:
                    ix: np.ndarray[np.uint64] = h5[
                        f"UserDataCfg/{self._task_parameters.detname}/ix"
                    ][()]
                    iy: np.ndarray[np.uint64] = h5[
                        f"UserDataCfg/{self._task_parameters.detname}/iy"
                    ][()]

                    ix -= np.min(ix)
                    iy -= np.min(iy)

                    if (
                        unassembled.flatten().shape != ix.flatten().shape
                        or unassembled.flatten().shape != iy.flatten().shape
                    ):
                        raise RuntimeError(
                            "Shapes of detector image and pixel coordinates do not match!"
                        )

                    out_shape: Tuple[int, int] = (
                        int(np.max(ix) + 1),
                        int(np.max(iy) + 1),
                    )

                    powder = np.asarray(
                        sparse.coo_matrix(
                            (unassembled.flatten(), (ix.flatten(), iy.flatten())),
                            shape=out_shape,
                        ).todense()
                    )

    return powder

_get_pixel_size_and_wavelength(ds, det)

Extract pixel size in mm and wavelength in Angstroms.

Parameters:

Name Type Description Default
ds DataSource

psana DataSource object.

required
det Detector

psana Detector object for which the pixel size will be extracted.

required

Returns:

Name Type Description
pixel_size float

Pixel size of det in mm.

wavelength float

X-ray wavelength in Angstroms.

Source code in lute/tasks/geometry.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def _get_pixel_size_and_wavelength(
    self, ds: psana.DataSource, det: psana.Detector
) -> Tuple[float, float]:
    """Extract pixel size in mm and wavelength in Angstroms.

    Args:
        ds (psana.DataSource): psana DataSource object.

        det (psana.Detector): psana Detector object for which the pixel size
            will be extracted.

    Returns:
        pixel_size (float): Pixel size of det in mm.

        wavelength (float): X-ray wavelength in Angstroms.
    """
    pixel_size: float
    if self._task_parameters.detname.lower() == "rayonix":
        pixel_size = ds.env().configStore().get(psana.Rayonix.ConfigV2).pixelWidth()
    else:
        pixel_size = det.pixel_size(ds.env())
    pixel_size /= 1e3

    wavelength: float = ds.env().epicsStore().value("SIOC:SYS0:ML00:AO192") * 10
    return pixel_size, wavelength

_initial_image_center(powder)

Estimate a beam center.

Will estimate the center as the powder image center if no guess is provided, otherwise this method returns the provided guess.

Parameters:

Name Type Description Default
powder ndarray[float64]

Powder image.

required

Returns:

Name Type Description
center ndarray[float64]

Estimated center, or user-provided guess if one was present in the TaskParameters.

Source code in lute/tasks/geometry.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def _initial_image_center(
    self, powder: np.ndarray[np.float64]
) -> np.ndarray[np.float64]:
    """Estimate a beam center.

    Will estimate the center as the powder image center if no guess is
    provided, otherwise this method returns the provided guess.

    Args:
        powder (np.ndarray[np.float64]): Powder image.

    Returns:
        center (np.ndarray[np.float64]): Estimated center, or user-provided
            guess if one was present in the TaskParameters.
    """
    if self._task_parameters.center_guess is not None:
        return self._task_parameters.center_guess
    return np.array(powder.shape) / 2.0

_opt_center(powder, indices, center_guess)

Optimize the beam center position on the detector.

Parameters:

Name Type Description Default
powder ndarray[float64]

2-D assembled powder image.

required
indices ndarray[float64]

Indices of peaks in a radial profile of the azimuthally integrated powder image.

required
center_guess Tuple[float, float]

Starting guess for the beam center in pixels.

required

Returns:

Name Type Description
res MinimizerResult

Result from the minimization. res.params["cx"] and res.params["cy"] contain the new beam center.

Source code in lute/tasks/geometry.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def _opt_center(
    self,
    powder: np.ndarray[np.float64],
    indices: np.ndarray[np.int64],
    center_guess: Tuple[float, float],
) -> lmfit.minimizer.MinimizerResult:
    """Optimize the beam center position on the detector.

    Args:
        powder (np.ndarray[np.float64]): 2-D assembled powder image.

        indices (np.ndarray[np.float64]): Indices of peaks in a radial profile
            of the azimuthally integrated powder image.

        center_guess (Tuple[float, float]): Starting guess for the beam center
            in pixels.

    Returns:
        res (lmfit.minimizer.MinimizerResult): Result from the minimization.
            res.params["cx"] and res.params["cy"] contain the new beam center.
    """
    # Perform fitting - mean center and normalize image first
    powder_norm: np.ndarray[np.float64] = (powder - np.mean(powder)) / np.std(
        powder
    )
    params: lmfit.Parameters = lmfit.Parameters()
    params.add("cx", value=center_guess[0])
    params.add("cy", value=center_guess[1])
    for i in range(len(indices)):
        params.add(f"r{i:d}", value=indices[i])
    res: lmfit.minimizer.MinimizerResult = lmfit.minimize(
        geometry_optimize_residual,
        params,
        method="leastsq",
        nan_policy="omit",
        args=(powder_norm,),
    )
    try:
        lmfit.report_fit(res)
    except TypeError as err:
        # This shouldn't happen but don't fail if it does
        logger.error(f"Unable to report fit! {err}")
    return res

_opt_distance(radial_profile, distance_guess, wavelength, pixel_size)

Optimize the detector distance.

Parameters:

Name Type Description Default
radial_profile ndarray[float64]

1-D array of peak intensities for an azimuthally integrated powder image.

required
distance_guess float

Starting guess for the detector distance.

required
wavelength float

X-ray wavelength in angstroms.

required
pixel_size float

Size of detector pixels in mm.

required

Returns:

Name Type Description
peak_indices ndarray[int64]

1-D array of peak indices.

selected_peaks ndarray[float64]

Array of selected peaks.

new_distance float

New, optimized, detector distance.

final_score float

Final score calculated from residuals associated with each q-spacing.

Source code in lute/tasks/geometry.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def _opt_distance(
    self,
    radial_profile: np.ndarray[np.float64],
    distance_guess: float,
    wavelength: float,
    pixel_size: float,
) -> Tuple[np.ndarray[np.int64], np.ndarray[np.float64], float, float]:
    """Optimize the detector distance.

    Args:
        radial_profile (np.ndarray[np.float64]): 1-D array of peak intensities
            for an azimuthally integrated powder image.

        distance_guess (float): Starting guess for the detector distance.

        wavelength (float): X-ray wavelength in angstroms.

        pixel_size (float): Size of detector pixels in mm.

    Returns:
        peak_indices (np.ndarray[np.int64]): 1-D array of peak indices.

        selected_peaks (np.ndarray[np.float64]): Array of selected peaks.

        new_distance (float): New, optimized, detector distance.

        final_score (float): Final score calculated from residuals associated
            with each q-spacing.
    """
    peak_indices: np.ndarray[np.int64]
    peak_indices, _ = find_peaks(radial_profile, prominence=1, distance=10)
    theta: np.ndarray[np.float64] = np.arctan(
        np.arange(radial_profile.shape[0]) * pixel_size / distance_guess
    )
    q_profile: np.ndarray[np.float64] = 2.0 * np.sin(theta / 2.0) / wavelength

    rings: np.ndarray[np.float64]
    final_score: float
    rings, final_score = self._calc_and_score_ideal_rings(q_profile[peak_indices])
    peaks_predicted: np.ndarray[np.float64] = (
        2 * distance_guess * np.arcsin(rings * wavelength / 2.0) / pixel_size
    )

    # FOR BEHENATE ONLY!!! Need other q0s
    q0: float = 0.1076
    new_distance: float = (
        peaks_predicted[0]
        * pixel_size
        / np.tan(2.0 * np.arcsin(wavelength * (q0 / (2 * np.pi)) / 2.0))
    )
    logger.info(
        f"Detector distance inferred from powder rings: {np.round(new_distance,2)}"
    )
    return peak_indices, radial_profile[peak_indices], new_distance, final_score

_opt_geom(powder, mask, params_guess, n_iterations, wavelength, pixel_size)

Optimize the detector distance and beam center.

Parameters:

Name Type Description Default
powder ndarray[float64]

2-D assembled powder image.

required
mask ndarray[float64]

Corresponding binary mask for the powder image.

required
params_guess Tuple[int, Tuple[float, float], float]

Initial guesses. In format: (n_peaks, (center_x, center_y), distance).

required
n_iterations int

Number of iterations to perform.

required
wavelength float

X-ray wavelength in angstroms.

required
pixel_size float

Size of detector pixels in mm.

required

Returns:

Name Type Description
peak_indices ndarray[int64]

1-D array of peak indices.

selected_peaks ndarray[float64]

Array of selected peaks.

final_scores float

Final scores calculated from residuals associated with each q-spacing for each iteration of optimization.

calc_distances List[float]

Optimized distances associated with each score.

calc_centers List[Tuple[float, float]]

Optimized centers associated with each score.

Source code in lute/tasks/geometry.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
def _opt_geom(
    self,
    powder: np.ndarray[np.float64],
    mask: np.ndarray[np.uint64],
    params_guess: Tuple[int, Tuple[float, float], float],
    n_iterations: int,
    wavelength: float,
    pixel_size: float,
) -> Tuple[List[float], List[float], List[Tuple[float, float]]]:
    """Optimize the detector distance and beam center.

    Args:
        powder (np.ndarray[np.float64]): 2-D assembled powder image.

        mask (np.ndarray[np.float64]): Corresponding binary mask for the
            powder image.

        params_guess (Tuple[int, Tuple[float, float], float]): Initial guesses.
            In format: (n_peaks, (center_x, center_y), distance).

        n_iterations (int): Number of iterations to perform.

        wavelength (float): X-ray wavelength in angstroms.

        pixel_size (float): Size of detector pixels in mm.

    Returns:
        peak_indices (np.ndarray[np.int64]): 1-D array of peak indices.

        selected_peaks (np.ndarray[np.float64]): Array of selected peaks.

        final_scores (float): Final scores calculated from residuals associated
            with each q-spacing for each iteration of optimization.

        calc_distances (List[float]): Optimized distances associated with each
            score.

        calc_centers (List[Tuple[float, float]]): Optimized centers associated
            with each score.
    """
    n_peaks: int = self._task_parameters.n_peaks
    radial_profile: np.ndarray[np.float64] = self._radial_profile(
        powder, mask, params_guess[1]
    )
    indices: np.ndarray[np.int64]
    peaks: np.ndarray[np.float64]
    distance: float
    final_score: float
    indices, peaks, distance, final_score = self._opt_distance(
        radial_profile, params_guess[2], wavelength, pixel_size
    )

    final_scores: List[float] = [final_score]
    calc_distances: List[float] = [distance]
    calc_centers: List[Tuple[float, float]] = [params_guess[1]]

    center_guess: Tuple[float, float] = params_guess[1]
    for iter in range(n_iterations):
        # Select the highest intensity peaks from the first 8 in q
        selected_indices: np.ndarray[np.int64] = indices[
            np.argsort(peaks[:8])[::-1][:n_peaks]
        ]
        res: lmfit.minimizer.MinimizerResult = self._opt_center(
            powder, selected_indices, center_guess
        )
        center_guess = (res.params["cx"].value, res.params["cy"].value)
        logger.info(f"New center is: ({center_guess[0]}, {center_guess[1]})")
        radial_profile = self._radial_profile(powder, mask, center_guess)
        indices, peaks, distance, final_score = self._opt_distance(
            radial_profile, distance, wavelength, pixel_size
        )
        final_scores.append(final_score)
        calc_distances.append(distance)
        calc_centers.append(center_guess)
    return final_scores, calc_distances, calc_centers

_radial_profile(powder, mask, center, threshold=10.0, filter_profile=False, filter_order=2, filter_threshold=0.25)

Compute the radial intensity profile of an image.

Parameters:

Name Type Description Default
powder ndarray[float64]

2-D assembled powder image.

required
mask Optional[ndarray[float64]]

Corresponding binary mask for the powder image.

required
center Tuple[float, float]

Beam center in the image, in pixels.

required
threshold float

Default: 10. Below this intensity set the intensity of the radial profile to 0.

10.0
filter_profile bool

Default: False. If True apply a lowpass Butterworth filter to the profile.

False
filter_order int

Default: 2. If applying a filter, the order of the filter.

2
filter_threshold float

Default: 0.25. Critical frequency for the Butterworth filter, if applying it.

0.25

Returns:

Name Type Description
radial_profile ndarray[float64]

1-D array of peak intensities for an azimuthally integrated powder image.

Source code in lute/tasks/geometry.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def _radial_profile(
    self,
    powder: np.ndarray[np.float64],
    mask: Optional[np.ndarray[np.float64]],
    center: Tuple[float, float],
    threshold: float = 10.0,
    filter_profile: bool = False,
    filter_order: int = 2,
    filter_threshold: float = 0.25,
) -> np.ndarray[np.float64]:
    """Compute the radial intensity profile of an image.

    Args:
        powder (np.ndarray[np.float64]): 2-D assembled powder image.

        mask (Optional[np.ndarray[np.float64]]): Corresponding binary mask
            for the powder image.

        center (Tuple[float, float]): Beam center in the image, in pixels.

        threshold (float): Default: 10. Below this intensity set the
            intensity of the radial profile to 0.

        filter_profile (bool): Default: False. If True apply a lowpass
            Butterworth filter to the profile.

        filter_order (int): Default: 2. If applying a filter, the order of
            the filter.

        filter_threshold (float): Default: 0.25. Critical frequency for the
            Butterworth filter, if applying it.

    Returns:
        radial_profile (np.ndarray[np.float64]): 1-D array of peak intensities
            for an azimuthally integrated powder image.
    """
    y: np.ndarray[np.int64]
    x: np.ndarray[np.int64]
    y, x = np.indices(powder.shape)
    radius_map: np.ndarray[np.float64] = (
        (x - center[0]) ** 2 + (y - center[1]) ** 2
    ) ** 0.5

    if mask is not None:
        radius_map = np.where(mask == 1, radius_map, 0)

    radius_map_int: np.ndarray[np.int64] = radius_map.astype(np.int64)
    tbin: np.ndarray[np.float64] = np.bincount(
        radius_map_int.ravel(), powder.ravel()
    )
    nr: np.ndarray[np.int64] = np.bincount(radius_map_int.ravel())
    radial_profile: np.ndarray[np.float64] = np.divide(
        tbin, nr, out=np.zeros(nr.shape[0]), where=nr != 0
    )
    if filter_profile:
        from scipy.signal import sosfiltfilt, butter

        sos = butter(filter_order, filter_threshold, output="sos")
        radial_profile = sosfiltfilt(sos, radial_profile)

    radial_profile[radial_profile < threshold] = 0
    return radial_profile

_run()

Perform geometry optimization of detector distance and beam center.

Requires a powder image from data acquired of Ag Behenate.

Source code in lute/tasks/geometry.py
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
def _run(self) -> None:
    """Perform geometry optimization of detector distance and beam center.

    Requires a powder image from data acquired of Ag Behenate.
    """
    exp: str = self._task_parameters.lute_config.experiment
    run: Union[int, str] = self._task_parameters.lute_config.run
    ds: psana.DataSource = psana.DataSource(f"exp={exp}:run={run}")
    det: psana.Detector = psana.Detector(self._task_parameters.detname, ds.env())
    pixel_size_mm: float
    wavelength_angs: float
    pixel_size_mm, wavelength_angs = self._get_pixel_size_and_wavelength(ds, det)

    powder: np.ndarray[np.float64] = self._extract_powder(
        self._task_parameters.powder
    )

    mask: Optional[np.ndarray[np.float64]] = self._extract_mask(
        self._task_parameters.mask
    )
    powder[powder > self._task_parameters.threshold] = 0

    starting_centers: List[Tuple[float, float]] = self._center_guesses(powder)
    starting_distance: float = self._estimate_distance()
    starting_scan_params: List[Tuple[int, Tuple[float, float], float]] = list(
        itertools.product(
            (self._task_parameters.n_peaks,),
            starting_centers,
            (starting_distance,),
        )
    )
    if self._mpi_rank == 0:
        quotient: int
        remainder: int
        quotient, remainder = divmod(len(starting_scan_params), self._mpi_size)
        parameters_per_rank = np.array(
            [
                quotient + 1 if rank < remainder else quotient
                for rank in range(self._mpi_size)
            ]
        )
        start_indices_per_rank: np.ndarray[np.int64] = np.zeros(
            self._mpi_size, dtype=np.int64
        )
        start_indices_per_rank[1:] = np.cumsum(parameters_per_rank[:-1])
    else:
        parameters_per_rank = np.zeros(self._mpi_size, dtype=np.int64)
        start_indices_per_rank = np.zeros(self._mpi_size, dtype=np.int64)

    self._mpi_comm.Bcast(parameters_per_rank, root=0)
    self._mpi_comm.Bcast(start_indices_per_rank, root=0)

    # Basic RANK-LOCAL variables - Each rank uses a subset of params
    ################################################################
    n_params: int = parameters_per_rank[self._mpi_rank]
    start_idx: int = start_indices_per_rank[self._mpi_rank]
    stop_idx: int = start_idx + n_params

    final_scores_by_rank: List[float] = []
    final_distances_by_rank: List[float] = []
    final_centers_by_rank: List[Tuple[float, float]] = []
    for params in starting_scan_params[start_idx:stop_idx]:
        scores: List[float]
        distances: List[float]
        centers: List[Tuple[float, float]]
        scores, distances, centers = self._opt_geom(
            powder,
            mask,
            params,
            self._task_parameters.n_iterations,
            wavelength_angs,
            pixel_size_mm,
        )
        final_scores_by_rank.extend(scores)
        final_distances_by_rank.extend(distances)
        final_centers_by_rank.extend(centers)

    self._mpi_comm.Barrier()

    # Gather all results
    final_scores: Union[List[float], List[List[float]]] = self._mpi_comm.gather(
        final_scores_by_rank, root=0
    )
    final_distances: Union[List[float], List[List[float]]] = self._mpi_comm.gather(
        final_distances_by_rank, root=0
    )
    final_centers: Union[
        List[Tuple[float, float], List[List[Tuple[float, float]]]]
    ] = self._mpi_comm.gather(final_centers_by_rank, root=0)

    # Flatten nested lists
    def flatten(nested_lists: List[List[Any]]) -> List[Any]:
        flattened: List[Any] = []
        for item in nested_lists:
            flattened.extend(item)
        return flattened

    if self._mpi_rank == 0:
        final_scores = flatten(final_scores)
        final_distances = flatten(final_distances)
        final_centers = flatten(final_centers)
        best_idx: int = np.argmax(final_scores)
        best_distance: float = final_distances[best_idx]
        best_center: Tuple[float, float] = final_centers[best_idx]
        # Calculate resolution at edge
        theta: np.ndarray[np.float64] = np.arctan(
            np.array((powder.shape[0] / 2)) * pixel_size_mm / best_distance
        )
        q: np.ndarray[np.float64] = 2.0 * np.sin(theta / 2.0) / wavelength_angs
        edge_resolution: float = 1.0 / q

        self._result.summary = []
        self._result.summary.append(
            {
                "Detector distance (mm)": best_distance,
                "Detector center (pixels)": best_center,
                "Detector edge resolution (A)": edge_resolution,
            }
        )

        run: int = int(self._task_parameters.lute_config.run)

        plots: pn.Tabs = self.plot_powder_and_summaries(
            powder, best_center, best_distance, wavelength_angs, pixel_size_mm, mask
        )

        self._result.summary.append(
            ElogSummaryPlots(f"Geometry_Fit/r{run:04d}", plots)
        )

        deploy_geometry(
            out_dir=self._task_parameters.geom_out_dir,
            exp=self._task_parameters.lute_config.experiment,
            run=run,
            ds=ds,
            det=det,
            pixel_size_mm=pixel_size_mm,
            center=best_center,
            distance=best_distance,
        )