Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add smoothing spline baseline #299

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions lumicks/pylake/piezo_tracking/baseline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import matplotlib.pyplot as plt
import numpy as np
from lumicks.pylake.channel import Slice, Continuous
from csaps import csaps
from sklearn.model_selection import RepeatedKFold
from sklearn.metrics import mean_squared_error


def unique_sorted(trap_position, force):
"""Sort and remove duplicates trap_position data to prepare for fit smoothing spline.
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position
force : lumicks.pylake.Slice
Force data
"""

x = trap_position.data
u, c = np.unique(x, return_counts=True)
m = np.isin(x, [u[c < 2]])
ind = np.argsort(x[m])

return x[m][ind], force.data[m][ind]
Comment on lines +19 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider simplifying this to:

def unique_sorted(trap_position, force):
    """Sort and remove duplicates trap_position data to prepare for fit smoothing spline.

    Parameters
    ----------
    trap_position : array_like
        Trap mirror position
    force : array_like
        Force data
    """

    sorted_position, idx, count = np.unique(trap_position, return_index=True, return_counts=True)
    return sorted_position[count < 2], force[idx][count < 2]

Saves you a search and a sort (note that the result from np.unique is already sorted).

Given that you return the raw numpy arrays rather than slices, I would also consider just taking raw numpy arrays as input and extracting the data where its called (rather than passing a slice).



def optimize_smoothing_factor(
trap_position,
force,
smoothing_factors,
n_repeats,
plot_smoothingfactor_mse,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that the function has smoothing_factor in the name, I reckon plot_mse would be sufficient.

):
"""Find optimal smoothing factor by choosing smoothing factor with lowest mse on test data
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
smoothing_factors : np.array float
Array of smoothing factors used for optimization fit, 0 <= smoothing_factor <= 1
n_repeats: int
number of times to repeat cross validation
plot_smoothingfactor_mse: bool
plot mse on test data vs smoothing factors used for optimization
"""

mse_test_vals = np.zeros(len(smoothing_factors))
x_sorted, y_sorted = unique_sorted(trap_position, force)
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I find a bit surprising is that you choose to pass trap position and force to this function, despite already having x_sorted and y_sorted. Is there a specific reason you prefer this?

for i, smooth in enumerate(smoothing_factors):
mse_test_array = np.zeros(n_repeats * 2)

rkf = RepeatedKFold(n_splits=2, n_repeats=n_repeats)
for k, (train_index, test_index) in enumerate(rkf.split(x_sorted)):
x_train, x_test = x_sorted[train_index], x_sorted[test_index]
y_train, y_test = y_sorted[train_index], y_sorted[test_index]

smoothing_result_train = csaps(x_train, y_train, smooth=smooth)
f_test = smoothing_result_train(x_test)
mse_test_array[k] = mean_squared_error(y_test, f_test)

mse_test_vals[i] = np.mean(mse_test_array)
if plot_smoothingfactor_mse:
plot_mse_smoothing_factors(smoothing_factors, mse_test_vals)

return smoothing_factors[np.argmin(mse_test_vals)]


def plot_mse_smoothing_factors(smoothing_factors, mse_test_vals):
plt.figure()
plt.plot(
np.log(1 - smoothing_factors),
mse_test_vals,
label=f"optimal s= {smoothing_factors[np.argmin(mse_test_vals)]:0.6f}",
)
plt.ylabel("mse test")
plt.xticks(np.log(1 - smoothing_factors), smoothing_factors)
plt.xlabel("smoothing factor")
plt.legend()
plt.show()


class ForceBaseLine:
def __init__(self, model, trap_data, force):
"""Force baseline

Parameters
----------
model : callable
Model which returns the baseline at specified points.
trap_data : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
"""
self._model = model
self._trap_data = trap_data
self._force = force

def valid_range(self):
return (np.min(self._trap_data.data), np.max(self._trap_data.data))

def correct_data(self, force, trap_position):
if not np.array_equal(force.timestamps, trap_position.timestamps):
raise RuntimeError("Provided force and trap position timestamps should match")

return Slice(
Continuous(
force.data - self._model(trap_position.data),
force._src.start,
force._src.dt,
),
labels={
"title": force.labels.get("title", "Baseline Corrected Force"),
"y": "Baseline Corrected Force (pN)",
},
calibration=force._calibration,
)

def plot(self):
plt.scatter(self._trap_data.data, self._force.data, s=2)
plt.plot(self._trap_data.data, self._model(self._trap_data.data), "k")
plt.xlabel("Mirror position")
plt.ylabel(self._force.labels["y"])
plt.title("Force baseline")

def plot_residual(self):
plt.scatter(self._trap_data.data, self._force.data - self._model(self._trap_data.data), s=2)
plt.xlabel("Mirror position")
plt.ylabel(f"Residual {self._force.labels['y']}")
plt.title("Fit residual")

@classmethod
def polynomial_baseline(cls, trap_position, force, degree=7, downsampling_factor=None):
"""Generate a polynomial baseline from data

Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
degree : int
Polynomial degree
downsampling_factor : int
Factor by which to downsample before baseline determination
"""
if not np.array_equal(force.timestamps, trap_position.timestamps):
raise RuntimeError("Provided force and trap position timestamps should match")

if downsampling_factor:
trap_position, force = (
ch.downsampled_by(downsampling_factor) for ch in (trap_position, force)
)

model = np.poly1d(np.polyfit(trap_position.data, force.data, deg=degree))
return cls(model, trap_position, force)\


@classmethod
def smoothingspline_baseline(
cls,
trap_position,
force,
smoothing_factor=None,
downsampling_factor=None,
smoothing_factors=np.array([0.9, 0.99, 0.999, 0.9999, 0.99999, 0.999999]),
n_repeats=10,
plot_smoothingfactor_mse=False,
):
"""Generate a smoothing spline baseline from data.
Items of xdata in smoothing spline must satisfy: x1 < x2 < ... < xN,
therefore the trap_position data is sorted and duplicates are removed
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
smoothing_factor : float
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would make sense to merge smoothing_factor and smoothing_factors.

If the input is just a value or has only one element, you use it as fixed value and otherwise you perform the optimization. You can use np.atleast_1d to upconvert a value to a numpy array so that you can always use the len operation on it. The default can then just be a default list that generally works.

Smoothing factor for smoothing spline, 0 <= smoothing_factor <= 1
downsampling_factor : int
Factor by which to downsample before baseline determination
smoothing_factors : np.array float
Array of smoothing factors used for optimization fit, 0 <= smoothing_factor <= 1
n_repeats: int
number of times to repeat cross validation
plot_smoothingfactor_mse: bool
plot mse on test data vs smoothing factors used for optimization
"""
if not np.array_equal(force.timestamps, trap_position.timestamps):
raise RuntimeError(
"Provided force and trap position timestamps should match"
)

if downsampling_factor:
trap_position, force = (
ch.downsampled_by(downsampling_factor) for ch in (trap_position, force)
)

x_sorted, y_sorted = unique_sorted(trap_position, force)

if smoothing_factor:
model = csaps(x_sorted, y_sorted, smooth=smoothing_factor)
else:
smoothing_factor = optimize_smoothing_factor(
trap_position,
force,
smoothing_factors=smoothing_factors,
n_repeats=n_repeats,
plot_smoothingfactor_mse=plot_smoothingfactor_mse,
)
model = csaps(x_sorted, y_sorted, smooth=smoothing_factor)

return cls(model, trap_position, force)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with calling the constructor like this is that you don't actually put the data that you used for fitting in now (you fitted to x_sorted and y_sorted but store the original raw data).

166 changes: 166 additions & 0 deletions lumicks/pylake/piezo_tracking/piezo_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import warnings
import numpy as np
import matplotlib.pyplot as plt
from lumicks.pylake.channel import Slice


class DistanceCalibration:
def __init__(self, trap_position, camera_distance, degree=1):
"""Map the trap position to the camera tracking distance using a linear fit.

Parameters
----------
trap_position : lumicks.pylake.Slice
Trap position.
camera_distance : lumicks.pylake.Slice
Camera distance as determined by Bluelake.
NOTE: The distance data should already have the bead diameter subtracted by Bluelake.
degree : int
Polynomial degree.
"""
trap_position, camera_distance = trap_position.downsampled_like(camera_distance)
mask = camera_distance.data != 0
missed_frames = np.sum(1 - mask)
if missed_frames > 0:
warnings.warn(
RuntimeWarning(
"There were frames with missing video tracking: "
f"{missed_frames} data point(s) were omitted."
)
)
self.position, self.distance = trap_position.data[mask], camera_distance.data[mask]
coeffs = np.polyfit(self.position, self.distance, degree)
self._model = np.poly1d(coeffs)

def __call__(self, trap_position):
return Slice(
trap_position._src._with_data(self._model(trap_position.data)),
labels={"title": "Piezo distance", "y": "Distance [um]"},
)

def valid_range(self):
return (np.min(self.position), np.max(self.position))

def __str__(self):
powers = np.flip(np.arange(self._model.order + 1))
return "".join(
f"{' + ' if coeff > 0 else ' - '}"
f"{abs(coeff):.4f}"
f"{'' if power == 0 else ' x' if power == 1 else f' x^{power}'}"
for power, coeff in zip(powers, self._model.coeffs)
).strip()

def __repr__(self):
return f"DistanceCalibration({str(self)})"

def plot(self):
"""Plot the calibration fit"""
plt.scatter(self.position, self.distance, s=2, label="data")
plt.plot(self.position, self._model(self.position), "k", label=f"${str(self)}$")
plt.xlabel("Mirror position")
plt.ylabel("Camera Distance [um]")
plt.tight_layout()
plt.legend()

def plot_residual(self):
"""Plot the residual of the calibration fit"""
plt.scatter(self.position, self._model(self.position) - self.distance, s=2)
plt.ylabel("Residual [um]")
plt.xlabel("Mirror position")
plt.tight_layout()
plt.legend()

@classmethod
def from_file(cls, calibration_file, degree=1):
"""Use a reference measurement to calibrate trap mirror position to bead-bead distance.

Parameters
----------
calibration_file : pylake.File
degree : int
Polynomial order.
"""
return cls(calibration_file["Trap position"]["1X"], calibration_file.distance1, degree)


class PiezoTrackingCalibration:
def __init__(
self,
trap_calibration,
baseline_force1,
baseline_force2,
signs=(1, -1),
):
"""Set up piezo tracking calibration

trap_calibration : pylake.DistanceCalibration
Calibration from trap position to trap to trap distance.
baseline_force1 : pylake.ForceBaseline
Baseline for force1
baseline_force2 : pylake.ForceBaseline
Baseline for force2
signs : tuple(float, float)
Sign convention for forces (e.g. (1, -1) indicates that force2 is negative).
"""
if len(signs) != 2:
raise ValueError(
"Argument `signs` should be a tuple of two floats reflecting the sign for each "
"channel."
)
for sign in signs:
if abs(sign) != 1:
raise ValueError("Each sign should be either -1 or 1.")

self.trap_calibration = trap_calibration
self.baseline_force1 = baseline_force1
self.baseline_force2 = baseline_force2
self._signs = signs

def valid_range(self):
"""Returns the mirror position range in which the piezo tracking is valid"""
calibration_items = (self.trap_calibration, self.baseline_force1, self.baseline_force2)
return np.min(np.stack([r.valid_range() for r in calibration_items]), axis=0)

def piezo_track(self, trap_position, force1, force2, trim=True, downsampling_factor=None):
"""Obtain piezo distance and baseline corrected forces

Parameters
----------
trap_position : pylake.channel.Slice
Trap position.
force1 : pylake.channel.Slice
First force channel to use for piezo tracking.
force2 : pylake.channel.Slice
Second force channel to use for piezo tracking.
trim : bool
Trim regions outside the calibration range.
downsampling_factor : Optional[int]
Downsampling factor.
"""
if downsampling_factor:
trap_position, force1, force2 = (
x.downsampled_by(downsampling_factor) for x in (trap_position, force1, force2)
)

trap_trap_dist = self.trap_calibration(trap_position)
bead_displacements = 1e-3 * sum(
sign * force / force.calibration[0]["kappa (pN/nm)"]
for force, sign in zip((force1, force2), self._signs)
)

piezo_distance = trap_trap_dist - bead_displacements
corrected_force1 = self.baseline_force1.correct_data(force1, trap_position)
corrected_force2 = self.baseline_force2.correct_data(force2, trap_position)

if trim:
valid_range = self.valid_range()
valid_mask = np.logical_and(
valid_range[0] <= trap_position.data, trap_position.data <= valid_range[1]
)
piezo_distance, corrected_force1, corrected_force2 = (
piezo_distance[valid_mask],
corrected_force1[valid_mask],
corrected_force2[valid_mask],
)

return piezo_distance, corrected_force1, corrected_force2
Loading