Skip to content

Allow expanded physical signal in calc_adc_params #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 132 additions & 81 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,25 @@ def dac(self, expanded=False, return_res=64, inplace=False):

return p_signal

def calc_adc_params(self):
def calc_adc_gain_baseline(self, ch, minvals, maxvals):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Compute adc_gain and baseline parameters for a given channel.

Parameters
----------
N/A
ch: int
The channel that the adc_gain and baseline are being computed for.
minvals: list
The minimum values for each channel.
maxvals: list
The maximum values for each channel.

Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel.
adc_gain : float
Calculated `adc_gain` value for a given channel.
baseline : int
Calculated `baseline` value for a given channel.

Notes
-----
Expand All @@ -729,85 +733,132 @@ def calc_adc_params(self):
for calculated float `adc_gain` values.

"""
adc_gains = []
baselines = []
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1

pmin = minvals[ch]
pmax = maxvals[ch]

# Figure out digital samples used to store physical samples

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

return adc_gain, baseline

def calc_adc_params(self):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.

Parameters
----------
N/A

if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel

# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)
"""
adc_gains = []
baselines = []

for ch in range(np.shape(self.p_signal)[1]):
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1
if self.p_signal is not None:
if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")

pmin = minvals[ch]
pmax = maxvals[ch]
# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)

# Figure out digital samples used to store physical samples
for ch in range(np.shape(self.p_signal)[1]):
adc_gain, baseline = self.calc_adc_gain_baseline(
ch, minvals, maxvals
)
adc_gains.append(adc_gain)
baselines.append(baseline)

elif self.e_p_signal is not None:
minvals = []
maxvals = []
for ch in self.e_p_signal:
minvals.append(np.nanmin(ch))
maxvals.append(np.nanmax(ch))

if any(x == math.inf for x in minvals) or any(
x == math.inf for x in maxvals
):
raise ValueError("Signal contains inf. Cannot perform adc.")

for ch, _ in enumerate(self.e_p_signal):
adc_gain, baseline = self.calc_adc_gain_baseline(
ch, minvals, maxvals
)
adc_gains.append(adc_gain)
baselines.append(baseline)

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

adc_gains.append(adc_gain)
baselines.append(baseline)
else:
raise Exception(
"Must supply p_signal or e_p_signal to calc_adc_params"
)

return (adc_gains, baselines)

Expand Down
Loading