Source code for fmflow.models.commonmode.classes

# coding: utf-8

# public items
__all__ = [
    "PCA",
    "EMPCA",
]

# standard library
from copy import deepcopy

# dependent packages
import fmflow as fm
import numpy as np
from .. import BaseModel
from numba import jit
from sklearn import decomposition
from scipy.optimize import curve_fit
from scipy.signal import savgol_filter


# classes
[docs]class PCA(BaseModel): def __init__(self, n_components=50, optimize_n=True, *, logger=None): params = { "n_components": n_components, "optimize_n": optimize_n, } super().__init__(params, logger)
[docs] def fit_transform(self, X): K = deepcopy(self.n_components) model = decomposition.TruncatedSVD(K) X = np.asarray(X) C = model.fit_transform(X) P = model.components_ if self.optimize_n: K_opt = self._optimize_K(C, 7) if K_opt < self.n_components: C, P = C[:, :K_opt], P[:K_opt] self.logger.info("optimized n_components: {}".format(K_opt)) else: self.logger.warning("optimized n_components exceeds the original") self.logger.warning("the original is used for reconstruction") self.components_ = P return C
@staticmethod def _optimize_K(C, level=5): npc = np.arange(C.shape[1]) lmd = np.log10(C.var(0)) # log eigen values def func(x, a, b, c): return a * 2 ** (-b * x) + c popt, pcov = curve_fit(func, npc, lmd) return int(level / popt[1]) + 1
[docs]class EMPCA(BaseModel): def __init__( self, n_components=50, ch_smooth=None, optimize_n=True, initialize="random", random_seed=None, *, convergence=1e-3, n_maxiters=300, logger=None ): params = { "n_components": n_components, "ch_smooth": ch_smooth, "optimize_n": optimize_n, "initialize": initialize, "random_seed": random_seed, "convergence": convergence, "n_maxiters": n_maxiters, } super().__init__(params, logger)
[docs] def fit_transform(self, X, W=None): X = np.asarray(X) # check array and weights if W is None: W = np.ones_like(X) else: W = np.asarray(W) if not X.shape == W.shape: raise ValueError("X and W must have same shapes") # shapes of matrices (for convergence) N, D, K = *X.shape, deepcopy(self.n_components) if self.optimize_n: model = decomposition.TruncatedSVD(K) C = model.fit_transform(X) K_opt = 2 * self._optimize_K(C, 7) K = K_opt if K_opt < K else K # initial arrays np.random.seed(self.random_seed) C = np.zeros([N, K]) if self.initialize == "random": P = self._orthogonal_from_random(K, D) elif self.initialize == "svd": P = self._orthogonal_from_svd(X, K) else: raise ValueError(self.initialize) # convergence cv = fm.utils.Convergence( self.convergence, self.n_maxiters, centering=True, raise_exception=True ) # EM algorithm try: _WX = W * X while not cv(C @ P): WX = _WX.copy() self.logger.debug(cv) C = self._update_coefficients(C, P, WX, W) P = self._update_eigenvectors(C, P, WX, W) if (self.ch_smooth is not None) and (self.ch_smooth > 0): P = self._smooth_eigenvectors(P, self.ch_smooth) except StopIteration: self.logger.warning("reached maximum iteration") # finally if self.optimize_n: K_opt = self._optimize_K(C, 7) if K_opt < self.n_components: self.logger.info("optimized n_components: {}".format(K_opt)) C, P = C[:, :K_opt], P[:K_opt] else: self.logger.warning("optimized n_components exceeds the original") self.logger.warning("the original is used for reconstruction") self.components_ = P return C
@staticmethod def _orthogonal_from_random(*shape): A = np.random.randn(*shape) for i in range(A.shape[0]): for j in range(i): A[i] -= (A[i] @ A[j]) * A[j] A[i] /= np.linalg.norm(A[i]) return A @staticmethod def _orthogonal_from_svd(X, K): svd = decomposition.TruncatedSVD(K) svd.fit(X) return svd.components_ @staticmethod def _smooth_eigenvectors(P, ch_smooth): return savgol_filter(P, ch_smooth, polyorder=3, axis=1) @staticmethod def _optimize_K(C, level=5): npc = np.arange(C.shape[1]) lmd = np.log10(C.var(0)) # log eigen values def func(x, a, b, c): return a * 2 ** (-b * x) + c popt, pcov = curve_fit(func, npc, lmd) return int(level / popt[1]) + 1 @staticmethod @jit(nopython=True, cache=True) def _update_coefficients(C, P, WX, W): N, D = WX.shape for n in range(N): # equiv to the equation 16 Pn = P @ (P * W[n]).T xn = P @ WX[n] C[n] = np.linalg.solve(Pn, xn) return C @staticmethod @jit(nopython=True, cache=True) def _update_eigenvectors(C, P, WX, W): N, D = WX.shape K = P.shape[0] for k in range(K): # equiv to the equation 21 Ck = C[:, k] P[k] = (Ck @ WX) / (Ck**2 @ W) # equiv to the equation 22 # but subtracting from WX for n in range(N): for d in range(D): WX[n, d] -= W[n, d] * P[k, d] * Ck[n] # renormalization for m in range(k): P[k] -= (P[k] @ P[m]) * P[m] P[k] /= np.linalg.norm(P[k]) return P