Adapter that uses the traits module to generate interfaces for FFT Analyzer.

.. moduleauthor:: Lia Domide <>
.. moduleauthor:: Stuart A. Knock <Stuart@tvb.invalid>

import math
import uuid
import numpy
import psutil

from tvb.adapters.datatypes.db.spectral import FourierSpectrumIndex
from tvb.adapters.datatypes.db.time_series import TimeSeriesIndex
from tvb.adapters.datatypes.h5.spectral_h5 import FourierSpectrumH5
from tvb.analyzers.fft import compute_fast_fourier_transform, SUPPORTED_WINDOWING_FUNCTIONS
from tvb.basic.neotraits.api import Attr, Float
from tvb.core.adapters.abcadapter import ABCAdapterForm, ABCAdapter
from tvb.core.entities.filters.chain import FilterChain
from tvb.core.neocom import h5
from tvb.core.neotraits.forms import TraitDataTypeSelectField, SelectField, FloatField, BoolField
from tvb.core.neotraits.view_model import ViewModel, DataTypeGidAttr
from tvb.datatypes.time_series import TimeSeries

[docs]class FFTAdapterModel(ViewModel): """ Parameters have the following meaning: - time_series: the input time series to which the fft is to be applied - segment_length: the block size which determines the frequency resolution of the resulting power spectra - window_function: windowing functions can be applied before the FFT is performed - detrend: None; specify if detrend is performed on the time series """ time_series = DataTypeGidAttr( linked_datatype=TimeSeries, label="Time Series", doc="""The TimeSeries to which the FFT is to be applied.""" ) segment_length = Float( label="Segment(window) length (ms)", default=1000.0, required=False, doc="""The TimeSeries can be segmented into equally sized blocks (overlapping if necessary). The segment length determines the frequency resolution of the resulting power spectra -- longer windows produce finer frequency resolution.""") window_function = Attr( field_type=str, label="Windowing function", choices=tuple(SUPPORTED_WINDOWING_FUNCTIONS), required=False, doc="""Windowing functions can be applied before the FFT is performed. Default is None, possibilities are: 'hamming'; 'bartlett'; 'blackman'; and 'hanning'. See, numpy.<function_name>.""") detrend = Attr( field_type=bool, label="Detrending", default=True, required=False, doc="""Detrending is not always appropriate. Default is True, False means no detrending is performed on the time series""")
[docs]class FFTAdapterForm(ABCAdapterForm): def __init__(self): super(FFTAdapterForm, self).__init__() self.time_series = TraitDataTypeSelectField(FFTAdapterModel.time_series, name='time_series', conditions=self.get_filters(), has_all_option=True) self.segment_length = FloatField(FFTAdapterModel.segment_length) self.window_function = SelectField(FFTAdapterModel.window_function) self.detrend = BoolField(FFTAdapterModel.detrend) @staticmethod
[docs] def get_view_model(): return FFTAdapterModel
[docs] def get_required_datatype(): return TimeSeriesIndex
[docs] def get_filters(): return FilterChain(fields=[FilterChain.datatype + '.data_ndim'], operations=["=="], values=[4])
[docs] def get_input_name(): return "time_series"
[docs]class FourierAdapter(ABCAdapter): """ TVB adapter for calling the FFT algorithm. """ _ui_name = "Fourier Spectral Analysis" _ui_description = "Calculate the FFT of a TimeSeries entity." _ui_subsection = "fourier" def __init__(self): super(FourierAdapter, self).__init__() self.memory_factor = 1
[docs] def get_form_class(self): return FFTAdapterForm
[docs] def get_output(self): return [FourierSpectrumIndex]
[docs] def configure(self, view_model): # type: (FFTAdapterModel) -> None """ Do any configuration needed before launching. """ self.input_time_series_index = self.load_entity_by_gid(view_model.time_series) self.input_shape = (self.input_time_series_index.data_length_1d, self.input_time_series_index.data_length_2d, self.input_time_series_index.data_length_3d, self.input_time_series_index.data_length_4d) self.log.debug("time_series shape is %s" % str(self.input_shape)) self.log.debug("Provided segment_length is %s" % view_model.segment_length) self.log.debug("Provided window_function is %s" % view_model.window_function) self.log.debug("Detrend is %s" % view_model.detrend)
[docs] def get_required_memory_size(self, view_model): # type: (FFTAdapterModel) -> int """ Returns the required memory to be able to run the adapter. """ input_size = * 8.0 output_size = self.result_size(self.input_shape, view_model.segment_length, self.input_time_series_index.sample_period) total_free_memory = psutil.virtual_memory().free + psutil.swap_memory().free total_required_memory = input_size + output_size while total_required_memory / self.memory_factor / total_free_memory > 0.8: self.memory_factor += 1 return total_required_memory / self.memory_factor
[docs] def get_required_disk_size(self, view_model): # type: (FFTAdapterModel) -> int """ Returns the required disk size to be able to run the adapter (in kB). """ output_size = self.result_size(self.input_shape, view_model.segment_length, self.input_time_series_index.sample_period) return self.array_size2kb(output_size)
[docs] def launch(self, view_model): # type: (FFTAdapterModel) -> [FourierSpectrumIndex] """ Launch algorithm and build results. :param view_model: the ViewModel keeping the algorithm inputs :return: the fourier spectrum for the specified time series """ block_size = int(math.floor(self.input_shape[2] / self.memory_factor)) blocks = int(math.ceil(self.input_shape[2] / block_size)) input_time_series_h5 = h5.h5_file_for_index(self.input_time_series_index) # --------------------- Prepare result entities ---------------------- fft_index = FourierSpectrumIndex() dest_path = h5.path_for(self.storage_path, FourierSpectrumH5, fft_index.gid) spectra_file = FourierSpectrumH5(dest_path) # ------------- NOTE: Assumes 4D, Simulator timeSeries. -------------- node_slice = [slice(self.input_shape[0]), slice(self.input_shape[1]), None, slice(self.input_shape[3])] # ---------- Iterate over slices and compose final result ------------ small_ts = TimeSeries() small_ts.sample_period = input_time_series_h5.sample_period.load() small_ts.sample_period_unit = input_time_series_h5.sample_period_unit.load() for block in range(blocks): node_slice[2] = slice(block * block_size, min([(block + 1) * block_size, self.input_shape[2]]), 1) = input_time_series_h5.read_data_slice(tuple(node_slice)) partial_result = compute_fast_fourier_transform(small_ts, view_model.segment_length, view_model.window_function, view_model.detrend) if blocks <= 1 and len(partial_result.array_data) == 0: self.add_operation_additional_info( "Fourier produced empty result (most probably due to a very short input TimeSeries).") return None spectra_file.write_data_slice(partial_result) input_time_series_h5.close() # ---------------------------- Fill results ---------------------------- partial_result.source.gid = view_model.time_series partial_result.gid = uuid.UUID(fft_index.gid) fft_index.fill_from_has_traits(partial_result) self.fill_index_from_h5(fft_index, spectra_file), scalars_only=True) spectra_file.close() self.log.debug("partial segment_length is %s" % (str(partial_result.segment_length))) return fft_index
[docs] def result_shape(input_shape, segment_length, sample_period): """Returns the shape of the main result (complex array) of the FFT.""" freq_len = (segment_length / sample_period) / 2.0 freq_len = int(min((input_shape[0], freq_len))) nseg = max((1, int(numpy.ceil(input_shape[0] * sample_period / segment_length)))) result_shape = (freq_len, input_shape[1], input_shape[2], input_shape[3], nseg) return result_shape
[docs] def result_size(self, input_shape, segment_length, sample_period): """ Returns the storage size in Bytes of the main result (complex array) of the FFT. """ result_size =, segment_length, sample_period)) * 2.0 * 8.0 # complex*Bytes return result_size