Source code for Protomix.baseline_correction

import numpy as np
import pandas as pd

from scipy import sparse
from scipy.sparse.linalg import spsolve

[docs] def baseline_correction(spectra_df: pd.DataFrame, method="als", lambda_=100, porder=1, maxIter=100, lam=1e4, ratio=0.05): """ Apply different baseline correction algorithms to each row of a DataFrame. :param spectra_df: DataFrame where each row is a spectrum to be baseline corrected. :type spectra_df: pd.DataFrame :param method: Method for baseline correction. Options are "als", "arpls", "airpls". :type method: str :param lambda_: Regularization parameter for ALS and AIRPLS. :type lambda_: float :param porder: Asymmetry parameter for ALS. :type porder: float :param maxIter: Maximum number of iterations for ALS and AIRPLS. :type maxIter: int :param lam: Lambda parameter for ARPLS. :type lam: float :param ratio: Ratio parameter for ARPLS. :type ratio: float :return: DataFrame with baseline corrected spectra. :rtype: pd.DataFrame """ if method == "als": corrected_spectra_df = spectra_df.apply(lambda row: als(row, lambda_, porder, maxIter), axis=1) elif method == "arpls": corrected_spectra_df = spectra_df.apply(lambda row: arpls(row, lam, ratio), axis=1) elif method == "airpls": corrected_spectra_df = spectra_df.apply(lambda row: airpls(row, lambda_, porder, maxIter), axis=1) else: raise ValueError("Invalid method. Supported methods: 'als', 'arpls', 'airpls'") return pd.DataFrame(corrected_spectra_df)
def als(y, lambda_, p, maxIter): m = len(y) D = sparse.eye(m, format='csc') D = D[1:] - D[:-1] # numpy.diff( ,2) does not work with sparse matrix. This is a workaround. D = D[1:] - D[:-1] D = D.T w = np.ones(m) for i in range(maxIter): W = sparse.diags(w, 0, shape=(m, m)) Z = W + lambda_ * D.dot(D.T) z = spsolve(Z, w * y) w = p * (y > z) + (1 - p) * (y < z) return y - z def arpls(y, lam=1e4, ratio=0.05, itermax=100): N = len(y) # D = sparse.csc_matrix(np.diff(np.eye(N), 2)) D = sparse.eye(N, format='csc') D = D[1:] - D[:-1] # numpy.diff( ,2) does not work with sparse matrix. This is a workaround. D = D[1:] - D[:-1] H = lam * D.T * D w = np.ones(N) for i in range(itermax): W = sparse.diags(w, 0, shape=(N, N)) WH = sparse.csc_matrix(W + H) C = sparse.csc_matrix(cholesky(WH.todense())) z = spsolve(C, spsolve(C.T, w * y)) d = y - z dn = d[d < 0] m = np.mean(dn) s = np.std(dn) wt = 1. / (1 + np.exp(2 * (d - (2 * s - m)) / s)) if np.linalg.norm(w - wt) / np.linalg.norm(w) < ratio: break w = wt return y - z def WhittakerSmooth(x, w, lam, differences=1): X = np.matrix(x) m = X.size # D = csc_matrix(np.diff(np.eye(m), differences)) D = sparse.eye(m, format='csc') for i in range(differences): D = D[1:] - D[:-1] # numpy.diff() does not work with sparse matrix. This is a workaround. W = sparse.diags(w, 0, shape=(m, m)) A = sparse.csc_matrix(W + (lam * D.T * D)) B = sparse.csc_matrix(W * X.T) background = spsolve(A, B) return np.array(background) def airpls(x, lambda_=100, porder=1, maxIter=100): m = x.shape[0] w = np.ones(m) for i in range(1, maxIter + 1): z = WhittakerSmooth(x, w, lambda_, porder) d = x - z dssn = np.abs(d[d < 0].sum()) if(dssn < 0.001 * (abs(x)).sum() or i == maxIter): if(i == maxIter): print('airpls: max iteration reached!') break w[d >= 0] = 0 # d>0 means that this point is part of a peak, # so its weight is set to 0 in order to ignore it w[d < 0] = np.exp(i * np.abs(d[d < 0]) / dssn) w[0] = np.exp(i * (d[d < 0]).max() / dssn) w[-1] = w[0] return x - z