Skip to content

PyPAM support

PypamSupport

Helper to process audio segments for a given day, resulting in the aggregated hybrid millidecade PSD product.

Source code in pbp/pypam_support.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class PypamSupport:
    """
    Helper to process audio segments for a given day, resulting in the
    aggregated hybrid millidecade PSD product.
    """

    def __init__(
        self,
        log,  #: loguru.Logger
    ) -> None:
        """
        Creates a helper to process audio segments for a given day,
        resulting in the aggregated hybrid millidecade PSD product.

        The created instance should be used only for a day of processing.

        The overall call sequence is:

        `add_missing_segment` can be called before calling `set_parameters`.
        This allows capturing initial segments with no data for the day.

        Call `set_parameters` as soon as you get the first audio segment for a day.
        Such segment is used to determine the sampling frequency.

        Then you can call `add_segment` or `add_missing_segment` as appropriate
        for each subsequent segment until covering the day.

        When all segments have been captured, call `process_captured_segments`
        to get the result.

        Args:
            log: A logger instance.
        """
        self.log = log

        # to capture reported segments (missing and otherwise):
        self._captured_segments: List[_CapturedSegment] = []
        self._num_actual_segments: int = 0

        # Determined from any actual segment:
        self._fbands: Optional[np.ndarray] = None

        # The following determined when `set_parameters` is called:

        self.fs: Optional[int] = None
        self._nfft: Optional[int] = None
        self._subset_to: Optional[Tuple[int, int]] = None
        self._bands_limits: List[float] = []
        self._bands_c: List[float] = []

    def set_parameters(
        self, fs: int, nfft: int = 0, subset_to: Optional[Tuple[int, int]] = None
    ):
        """
        Call this as soon as you get the first audio segment for a day,
        in particular, to set the sampling frequency used for subsequent calculations.

        Args:
            fs (int): Sampling frequency.
            nfft (int): Number of samples to use for the FFT. If 0, it will be set to `fs`.
            subset_to (Tuple[int, int]): If not None, the product for a day will get the resulting PSD
                subset to `[lower, upper)`, in terms of central frequency.
        """
        assert fs > 0
        self.fs = fs
        self._nfft = nfft if nfft > 0 else self.fs

        self._subset_to = subset_to
        band = [0, self.fs / 2]  # for now.

        self.log.debug(f"PypamSupport: {subset_to=} {band=}")

        self._bands_limits, self._bands_c = utils.get_hybrid_millidecade_limits(
            band=band, nfft=self._nfft
        )

    @property
    def parameters_set(self) -> bool:
        """
        True if `set_parameters` has already been called.
        """
        return self.fs is not None

    def add_missing_segment(self, dt: datetime):
        """
        Adds a missing segment to the ongoing aggregation.
        This method can be called even before `set_parameters` is called.

        Args:
            dt (datetime): The datetime of the start of the missing segment.
        """
        self._captured_segments.append(_CapturedSegment(dt, 0, None))
        self.log.debug(f"  captured segment: {dt}  (NO DATA)")

    def add_segment(self, dt: datetime, data: np.ndarray):
        """
        Adds an audio segment to the ongoing aggregation.

        `set_parameters` must have been called first.

        Args:
            dt (datetime): The datetime of the start of the segment.
            data (np.ndarray): The audio data.
        """
        assert self.parameters_set
        assert self.fs is not None
        assert self._nfft is not None

        self._fbands, spectrum = _get_spectrum(data, self.fs, self._nfft)
        num_secs = len(data) / self.fs
        self._captured_segments.append(_CapturedSegment(dt, num_secs, spectrum))
        self._num_actual_segments += 1
        self.log.debug(f"  captured segment: {dt}")

    def process_captured_segments(
        self,
        sensitivity_da: Optional[xr.DataArray] = None,
    ) -> Optional[ProcessResult]:
        """
        Gets the resulting hybrid millidecade bands for all captured segments.
        At least one actual segment must have been captured, otherwise None is returned.

        Args:
            sensitivity_da (Optional[xr.DataArray]): The sensitivity data.
                If given, it will be used to calibrate the result.

        Returns:
            Result if at least an actual segment was captured, None otherwise.
        """
        if self._num_actual_segments == 0:
            return None

        # Use any actual segment to determine NaN spectrum for the missing segments:
        actual = next(s for s in self._captured_segments if s.spectrum is not None)
        assert actual is not None, "unexpected: no actual segment found"
        assert actual.spectrum is not None, "unexpected: no actual.spectrum"
        nan_spectrum = np.full(len(actual.spectrum), np.nan)

        # gather resulting variables:
        times: List[np.int64] = []
        effort: List[np.float32] = []
        spectra: List[np.ndarray] = []
        for cs in self._captured_segments:
            times.append(np.int64(cs.dt.timestamp()))
            effort.append(np.float32(cs.num_secs))

            spectrum = nan_spectrum if cs.spectrum is None else cs.spectrum
            self.log.debug(f"  spectrum for: {cs.dt} (effort={cs.num_secs})")
            spectra.append(spectrum)

        self.log.info("Aggregating results ...")
        psd_da = self._get_aggregated_milli_psd(
            times=times,
            spectra=spectra,
            sensitivity_da=sensitivity_da,
        )

        effort_da = xr.DataArray(
            data=effort,
            dims=["time"],
            coords={"time": psd_da.time},
        )
        return ProcessResult(psd_da, effort_da)

    def _get_aggregated_milli_psd(
        self,
        times: List[np.int64],
        spectra: List[np.ndarray],
        sensitivity_da: Optional[xr.DataArray] = None,
    ) -> xr.DataArray:
        # Convert the spectra to a DataArray
        psd_da = xr.DataArray(
            data=spectra,
            coords={"time": times, "frequency": self._fbands},
            dims=["time", "frequency"],
        )

        psd_da = self._spectra_to_bands(psd_da)
        self.log.debug(f"  {psd_da.frequency_bins=}")
        psd_da = self._apply_sensitivity_if_given(psd_da, sensitivity_da)

        # just need single precision:
        psd_da = psd_da.astype(np.float32)
        psd_da["frequency"] = psd_da.frequency_bins.astype(np.float32)
        del psd_da["frequency_bins"]

        milli_psd = psd_da
        milli_psd.name = "psd"

        self.log.info(f"Resulting milli_psd={milli_psd}")

        return milli_psd

    def _apply_sensitivity_if_given(
        self,
        psd_da: xr.DataArray,
        sensitivity_da: Optional[xr.DataArray],
    ) -> xr.DataArray:
        psd_da = cast(xr.DataArray, 10 * np.log10(psd_da))

        # NOTE: per slack discussion today 2023-05-23,
        # apply _addition_ of the given sensitivity
        # (previously, subtraction)
        # 2023-06-12: Back to subtraction (as we're focusing on MARS data at the moment)
        # TODO but this is still one pending aspect to finalize.
        # 2023-08-03: sensitivity_flat_value is handled upstream now.

        if sensitivity_da is not None:
            freq_subset = sensitivity_da.interp(frequency=psd_da.frequency_bins)
            self.log.info(
                f"  Applying sensitivity({len(freq_subset.values)})={freq_subset}"
            )
            psd_da -= freq_subset.values

        return psd_da

    def _spectra_to_bands(self, psd_da: xr.DataArray) -> xr.DataArray:
        assert self.fs is not None
        assert self._nfft is not None

        bands_limits, bands_c = self._bands_limits, self._bands_c
        if self._subset_to is not None:
            bands_limits, bands_c = self._adjust_limits(
                bands_limits, bands_c, self._subset_to
            )

        def print_array(name: str, arr: List[float]):
            self.log.info(f"{name} ({len(arr)}) = {brief_list(arr)}")

        print_array("       bands_c", bands_c)
        print_array("  bands_limits", bands_limits)

        psd_da = utils.spectra_ds_to_bands(
            psd_da,
            bands_limits,
            bands_c,
            fft_bin_width=self.fs / self._nfft,
            db=False,
        )

        psd_da = psd_da.drop_vars(["lower_frequency", "upper_frequency"])
        return psd_da

    def _adjust_limits(
        self, bands_limits: List[float], bands_c: List[float], subset_to: Tuple[int, int]
    ) -> Tuple[List[float], List[float]]:
        start_hz, end_hz = subset_to
        self.log.info(f"Subsetting to [{start_hz:,}, {end_hz:,})Hz")

        start_index = 0
        while start_index < len(bands_c) and bands_c[start_index] < start_hz:
            start_index += 1
        end_index = start_index
        while end_index < len(bands_c) and bands_c[end_index] < end_hz:
            end_index += 1
        bands_c = bands_c[start_index:end_index]
        new_bands_c_len = len(bands_c)
        bands_limits = bands_limits[start_index : start_index + new_bands_c_len + 1]

        return bands_limits, bands_c

parameters_set: bool property

True if set_parameters has already been called.

__init__(log)

Creates a helper to process audio segments for a given day, resulting in the aggregated hybrid millidecade PSD product.

The created instance should be used only for a day of processing.

The overall call sequence is:

add_missing_segment can be called before calling set_parameters. This allows capturing initial segments with no data for the day.

Call set_parameters as soon as you get the first audio segment for a day. Such segment is used to determine the sampling frequency.

Then you can call add_segment or add_missing_segment as appropriate for each subsequent segment until covering the day.

When all segments have been captured, call process_captured_segments to get the result.

Parameters:

Name Type Description Default
log

A logger instance.

required
Source code in pbp/pypam_support.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    log,  #: loguru.Logger
) -> None:
    """
    Creates a helper to process audio segments for a given day,
    resulting in the aggregated hybrid millidecade PSD product.

    The created instance should be used only for a day of processing.

    The overall call sequence is:

    `add_missing_segment` can be called before calling `set_parameters`.
    This allows capturing initial segments with no data for the day.

    Call `set_parameters` as soon as you get the first audio segment for a day.
    Such segment is used to determine the sampling frequency.

    Then you can call `add_segment` or `add_missing_segment` as appropriate
    for each subsequent segment until covering the day.

    When all segments have been captured, call `process_captured_segments`
    to get the result.

    Args:
        log: A logger instance.
    """
    self.log = log

    # to capture reported segments (missing and otherwise):
    self._captured_segments: List[_CapturedSegment] = []
    self._num_actual_segments: int = 0

    # Determined from any actual segment:
    self._fbands: Optional[np.ndarray] = None

    # The following determined when `set_parameters` is called:

    self.fs: Optional[int] = None
    self._nfft: Optional[int] = None
    self._subset_to: Optional[Tuple[int, int]] = None
    self._bands_limits: List[float] = []
    self._bands_c: List[float] = []

set_parameters(fs, nfft=0, subset_to=None)

Call this as soon as you get the first audio segment for a day, in particular, to set the sampling frequency used for subsequent calculations.

Parameters:

Name Type Description Default
fs int

Sampling frequency.

required
nfft int

Number of samples to use for the FFT. If 0, it will be set to fs.

0
subset_to Tuple[int, int]

If not None, the product for a day will get the resulting PSD subset to [lower, upper), in terms of central frequency.

None
Source code in pbp/pypam_support.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def set_parameters(
    self, fs: int, nfft: int = 0, subset_to: Optional[Tuple[int, int]] = None
):
    """
    Call this as soon as you get the first audio segment for a day,
    in particular, to set the sampling frequency used for subsequent calculations.

    Args:
        fs (int): Sampling frequency.
        nfft (int): Number of samples to use for the FFT. If 0, it will be set to `fs`.
        subset_to (Tuple[int, int]): If not None, the product for a day will get the resulting PSD
            subset to `[lower, upper)`, in terms of central frequency.
    """
    assert fs > 0
    self.fs = fs
    self._nfft = nfft if nfft > 0 else self.fs

    self._subset_to = subset_to
    band = [0, self.fs / 2]  # for now.

    self.log.debug(f"PypamSupport: {subset_to=} {band=}")

    self._bands_limits, self._bands_c = utils.get_hybrid_millidecade_limits(
        band=band, nfft=self._nfft
    )

add_missing_segment(dt)

Adds a missing segment to the ongoing aggregation. This method can be called even before set_parameters is called.

Parameters:

Name Type Description Default
dt datetime

The datetime of the start of the missing segment.

required
Source code in pbp/pypam_support.py
119
120
121
122
123
124
125
126
127
128
def add_missing_segment(self, dt: datetime):
    """
    Adds a missing segment to the ongoing aggregation.
    This method can be called even before `set_parameters` is called.

    Args:
        dt (datetime): The datetime of the start of the missing segment.
    """
    self._captured_segments.append(_CapturedSegment(dt, 0, None))
    self.log.debug(f"  captured segment: {dt}  (NO DATA)")

add_segment(dt, data)

Adds an audio segment to the ongoing aggregation.

set_parameters must have been called first.

Parameters:

Name Type Description Default
dt datetime

The datetime of the start of the segment.

required
data ndarray

The audio data.

required
Source code in pbp/pypam_support.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def add_segment(self, dt: datetime, data: np.ndarray):
    """
    Adds an audio segment to the ongoing aggregation.

    `set_parameters` must have been called first.

    Args:
        dt (datetime): The datetime of the start of the segment.
        data (np.ndarray): The audio data.
    """
    assert self.parameters_set
    assert self.fs is not None
    assert self._nfft is not None

    self._fbands, spectrum = _get_spectrum(data, self.fs, self._nfft)
    num_secs = len(data) / self.fs
    self._captured_segments.append(_CapturedSegment(dt, num_secs, spectrum))
    self._num_actual_segments += 1
    self.log.debug(f"  captured segment: {dt}")

process_captured_segments(sensitivity_da=None)

Gets the resulting hybrid millidecade bands for all captured segments. At least one actual segment must have been captured, otherwise None is returned.

Parameters:

Name Type Description Default
sensitivity_da Optional[DataArray]

The sensitivity data. If given, it will be used to calibrate the result.

None

Returns:

Type Description
Optional[ProcessResult]

Result if at least an actual segment was captured, None otherwise.

Source code in pbp/pypam_support.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def process_captured_segments(
    self,
    sensitivity_da: Optional[xr.DataArray] = None,
) -> Optional[ProcessResult]:
    """
    Gets the resulting hybrid millidecade bands for all captured segments.
    At least one actual segment must have been captured, otherwise None is returned.

    Args:
        sensitivity_da (Optional[xr.DataArray]): The sensitivity data.
            If given, it will be used to calibrate the result.

    Returns:
        Result if at least an actual segment was captured, None otherwise.
    """
    if self._num_actual_segments == 0:
        return None

    # Use any actual segment to determine NaN spectrum for the missing segments:
    actual = next(s for s in self._captured_segments if s.spectrum is not None)
    assert actual is not None, "unexpected: no actual segment found"
    assert actual.spectrum is not None, "unexpected: no actual.spectrum"
    nan_spectrum = np.full(len(actual.spectrum), np.nan)

    # gather resulting variables:
    times: List[np.int64] = []
    effort: List[np.float32] = []
    spectra: List[np.ndarray] = []
    for cs in self._captured_segments:
        times.append(np.int64(cs.dt.timestamp()))
        effort.append(np.float32(cs.num_secs))

        spectrum = nan_spectrum if cs.spectrum is None else cs.spectrum
        self.log.debug(f"  spectrum for: {cs.dt} (effort={cs.num_secs})")
        spectra.append(spectrum)

    self.log.info("Aggregating results ...")
    psd_da = self._get_aggregated_milli_psd(
        times=times,
        spectra=spectra,
        sensitivity_da=sensitivity_da,
    )

    effort_da = xr.DataArray(
        data=effort,
        dims=["time"],
        coords={"time": psd_da.time},
    )
    return ProcessResult(psd_da, effort_da)

ProcessResult

The result of processing a day of audio segments when at least one actual segment has been captured.

Attributes:

Name Type Description
psd_da DataArray

The resulting hybrid millidecade PSD product.

effort_da DataArray

The effort in seconds for each segment.

Source code in pbp/pypam_support.py
14
15
16
17
18
19
20
21
22
23
24
25
26
@dataclass
class ProcessResult:
    """
    The result of processing a day of audio segments when at least one actual segment
    has been captured.

    Attributes:
        psd_da: The resulting hybrid millidecade PSD product.
        effort_da: The effort in seconds for each segment.
    """

    psd_da: xr.DataArray
    effort_da: xr.DataArray