Source code for tvb.analyzers.ica_algorithm

# -*- coding: utf-8 -*-
# Python implementation of the fast ICA algorithms.
# Reference: Tables 8.3 and 8.4 page 196 in the book:
# Independent Component Analysis, by  Hyvarinen et al.
# Authors: Pierre Lafaye de Micheaux, Stefan van der Walt, Gael Varoquaux,
#          Bertrand Thirion, Alexandre Gramfort, Denis A. Engemann
# License: BSD 3 clause
# Code from adapted by The Virtual Brain team - August 2019
# -------------------------------------------------------------------------------------------
# TheVirtualBrain-Scientific Package. This package holds all simulators, and
# analysers necessary to run brain-simulations. You can use it stand alone or
# in conjunction with TheVirtualBrain-Framework Package. See content of the
# documentation-folder for more details. See also
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <>.
# When using The Virtual Brain for scientific publications, please cite it as explained here:


.. moduleauthor:: Gabriel Florea <>


import warnings
import numpy
from scipy import linalg
from scipy._lib._util import check_random_state
from six import moves
from six import string_types

def _sym_decorrelation(W):
    """ Symmetric decorrelation
    i.e. W <- (W * W.T) ^{-1/2} * W
    s, u = linalg.eigh(, W.T))
    # u (resp. s) contains the eigenvectors (resp. square roots of
    # the eigenvalues) of W * W.T
    return * (1. / numpy.sqrt(s)), u.T), W)

def _ica_def(X, tol, g, fun_args, max_iter, w_init):

    Deflationary FastICA using fun approx to neg-entropy function
    Used internally by FastICA.


    n_components = w_init.shape[0]
    W = numpy.zeros((n_components, n_components), dtype=X.dtype)
    n_iter = []

    # j is the index of the extracted component
    for j in range(n_components):
        w = w_init[j, :].copy()
        w /= numpy.sqrt((w ** 2).sum())
        i = None
        for i in moves.xrange(max_iter):
            gwtx, g_wtx = g(, X), fun_args)

            w1 = (X * gwtx).mean(axis=1) - g_wtx.mean() * w

            w1 -=, W[:j].T), W[:j])

            w1 /= numpy.sqrt((w1 ** 2).sum())

            lim = numpy.abs(numpy.abs((w1 * w).sum()) - 1)
            w = w1
            if lim < tol:

        n_iter.append(i + 1)
        W[j, :] = w

    return W, max(n_iter)

def _ica_par(X, tol, g, fun_args, max_iter, w_init):
    Parallel FastICA.

    Used internally by FastICA --main loop


    W = _sym_decorrelation(w_init)
    del w_init
    p_ = float(X.shape[1])
    ii = None
    for ii in moves.xrange(max_iter):
        gwtx, g_wtx = g(, X), fun_args)
        W1 = _sym_decorrelation(, X.T) / p_
                                - g_wtx[:, numpy.newaxis] * W)
        del gwtx, g_wtx
        # builtin max, abs are faster than numpy counter parts.
        lim = max(abs(abs(numpy.diag(, W.T))) - 1))
        W = W1
        if lim < tol:
        warnings.warn('FastICA did not converge. Consider increasing '
                      'tolerance or the maximum number of iterations.')

    return W, ii + 1

# Some standard non-linear functions.
def _logcosh(x, fun_args=None):
    alpha = fun_args.get('alpha', 1.0)  # comment it out?

    x *= alpha
    gx = numpy.tanh(x, x)  # apply the tanh inplace
    g_x = numpy.empty(x.shape[0])
    # XXX compute in chunks to avoid extra allocation
    for i, gx_i in enumerate(gx):  # please don't vectorize.
        g_x[i] = (alpha * (1 - gx_i ** 2)).mean()
    return gx, g_x

def _exp(x, fun_args):
    exp = numpy.exp(-(x ** 2) / 2)
    gx = x * exp
    g_x = (1 - x ** 2) * exp
    return gx, g_x.mean(axis=-1)

def _cube(x, fun_args):
    return x ** 3, (3 * x ** 2).mean(axis=-1)

[docs]def fastica(X, n_components=None, algorithm="parallel", whiten=True, fun="logcosh", fun_args=None, max_iter=200, tol=1e-04, w_init=None, random_state=None, return_X_mean=False, compute_sources=True, return_n_iter=False): """Perform Fast Independent Component Analysis. Read more in the :ref:`User Guide <ICA>`. Parameters ---------- X : array-like, shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. n_components : int, optional Number of components to extract. If None no dimension reduction is performed. algorithm : {'parallel', 'deflation'}, optional Apply a parallel or deflational FASTICA algorithm. whiten : boolean, optional If True perform an initial whitening of the data. If False, the data is assumed to have already been preprocessed: it should be centered, normed and white. Otherwise you will get incorrect results. In this case the parameter n_components will be ignored. fun : string or function, optional. Default: 'logcosh' The functional form of the G function used in the approximation to neg-entropy. Could be either 'logcosh', 'exp', or 'cube'. You can also provide your own function. It should return a tuple containing the value of the function, and of its derivative, in the point. The derivative should be averaged along its last dimension. Example: def my_g(x): return x ** 3, np.mean(3 * x ** 2, axis=-1) fun_args : dictionary, optional Arguments to send to the functional form. If empty or None and if fun='logcosh', fun_args will take value {'alpha' : 1.0} max_iter : int, optional Maximum number of iterations to perform. tol : float, optional A positive scalar giving the tolerance at which the un-mixing matrix is considered to have converged. w_init : (n_components, n_components) array, optional Initial un-mixing array of dimension (n.comp,n.comp). If None (default) then an array of normal r.v.'s is used. random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. return_X_mean : bool, optional If True, X_mean is returned too. compute_sources : bool, optional If False, sources are not computed, but only the rotation matrix. This can save memory when working with big data. Defaults to True. return_n_iter : bool, optional Whether or not to return the number of iterations. Returns ------- K : array, shape (n_components, n_features) | None. If whiten is 'True', K is the pre-whitening matrix that projects data onto the first n_components principal components. If whiten is 'False', K is 'None'. W : array, shape (n_components, n_components) Estimated un-mixing matrix. The mixing matrix can be obtained by:: w =, K.T) A = w.T * (w * w.T).I S : array, shape (n_samples, n_components) | None Estimated source matrix X_mean : array, shape (n_features, ) The mean over features. Returned only if return_X_mean is True. n_iter : int If the algorithm is "deflation", n_iter is the maximum number of iterations run across all components. Else they are just the number of iterations taken to converge. This is returned only when return_n_iter is set to `True`. Notes ----- The data matrix X is considered to be a linear combination of non-Gaussian (independent) components i.e. X = AS where columns of S contain the independent components and A is a linear mixing matrix. In short ICA attempts to `un-mix' the data by estimating an un-mixing matrix W where ``S = W K X.`` This implementation was originally made for data of shape [n_features, n_samples]. Now the input is transposed before the algorithm is applied. This makes it slightly faster for Fortran-ordered input. Implemented using FastICA: `A. Hyvarinen and E. Oja, Independent Component Analysis: Algorithms and Applications, Neural Networks, 13(4-5), 2000, pp. 411-430` """ K = None X_mean = None random_state = check_random_state(random_state) fun_args = {} if fun_args is None else fun_args alpha = fun_args.get('alpha', 1.0) if not 1 <= alpha <= 2: raise ValueError('alpha must be in [1,2]') if fun == 'logcosh': g = _logcosh elif fun == 'exp': g = _exp elif fun == 'cube': g = _cube elif callable(fun): def g(x, fun_args): return fun(x, **fun_args) else: exc = ValueError if isinstance(fun, string_types) else TypeError raise exc("Unknown function %r;" " should be one of 'logcosh', 'exp', 'cube' or callable" % fun) X = validate_and_transpose_source(X) n, p = X.shape if not whiten and n_components is not None: n_components = None warnings.warn('Ignoring n_components with whiten=False.') if n_components is None: n_components = min(n, p) if n_components > min(n, p): n_components = min(n, p) warnings.warn('n_components is too large: it will be set to %s' % n_components) if whiten: # Centering the columns (ie the variables) X_mean = X.mean(axis=-1) X -= X_mean[:, numpy.newaxis] # Whitening and preprocessing by PCA u, d, _ = linalg.svd(X, full_matrices=False) del _ K = (u / d).T[:n_components] # see (6.33) p.140 del u, d X1 =, X) # see (13.6) p.267 Here X1 is white and data # in X has been projected onto a subspace by PCA X1 *= numpy.sqrt(p) else: X1 = X if w_init is None: w_init = numpy.asarray(random_state.normal(size=(n_components, n_components)), dtype=X1.dtype) else: w_init = numpy.asarray(w_init) if w_init.shape != (n_components, n_components): raise ValueError('w_init has invalid shape -- should be %(shape)s' % {'shape': (n_components, n_components)}) kwargs = {'tol': tol, 'g': g, 'fun_args': fun_args, 'max_iter': max_iter, 'w_init': w_init} if algorithm == 'parallel': W, n_iter = _ica_par(X1, **kwargs) elif algorithm == 'deflation': W, n_iter = _ica_def(X1, **kwargs) else: raise ValueError('Invalid algorithm: must be either `parallel` or' ' `deflation`.') del X1 if whiten: if compute_sources: S =, K), X).T else: S = None if return_X_mean: if return_n_iter: return K, W, S, X_mean, n_iter else: return K, W, S, X_mean else: if return_n_iter: return K, W, S, n_iter else: return K, W, S else: if compute_sources: S =, X).T else: S = None if return_X_mean: if return_n_iter: return None, W, S, None, n_iter else: return None, W, S, None else: if return_n_iter: return None, W, S, n_iter else: return None, W, S
[docs]def validate_and_transpose_source(X): # Raise error if input is scalar or 1D if X.ndim == 0: raise ValueError( "Expected 2D array, got scalar array instead:\narray={}.\n" "Reshape your data either using array.reshape(-1, 1) if " "your data has a single feature or array.reshape(1, -1) " "if it contains a single sample.".format(X)) if X.ndim == 1: raise ValueError( "Expected 2D array, got 1D array instead:\narray={}.\n" "Reshape your data either using array.reshape(-1, 1) if " "your data has a single feature or array.reshape(1, -1) " "if it contains a single sample.".format(X)) if numpy.issubdtype(X.dtype, numpy.floating) and X.dtype.itemsize < 8: sum_result = numpy.sum(X, dtype=numpy.float64) else: sum_result = numpy.sum(X) is_finite = numpy.isfinite(sum_result) if not is_finite: raise ValueError("Input contains infinity or a value too large for {}.".format(X.dtype)) return X.T