# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Scientific Package. This package holds all simulators, and
# analysers necessary to run brain-simulations. You can use it stand alone or
# in conjunction with TheVirtualBrain-Framework Package. See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2024, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
Numba backend which uses templating to generate simulation
code.
.. moduleauthor:: Marmaduke Woodman <marmaduke.woodman@univ-amu.fr>
"""
import os
import importlib
import numpy as np
import autopep8
from .templates import MakoUtilMix
from tvb.simulator.lab import *
[docs]
class NbMPRBackend(MakoUtilMix):
[docs]
def build_py_func(self, template_source, content, name='kernel', print_source=False,
modname=None):
"Build and retrieve one or more Python functions from template."
source = self.render_template(template_source, content)
source = autopep8.fix_code(source)
if print_source:
print(self.insert_line_numbers(source))
if modname is not None:
return self.eval_module(source, name, modname)
else:
return self.eval_source(source, name)
[docs]
def eval_source(self, source, name):
globals_ = {}
try:
exec(source, globals_)
except Exception as exc:
if not print_source:
print(self._insert_line_numbers(source))
raise exc
fns = [globals_[n] for n in name.split(',')]
return fns[0] if len(fns)==1 else fns
[docs]
def eval_module(self, source, name, modname):
here = os.path.abspath(os.path.dirname(__file__))
genp = os.path.join(here, 'templates', 'generated')
with open(f'{genp}/{modname}.py', 'w') as fd:
fd.write(source)
fullmodname = f'tvb.simulator.backend.templates.generated.{modname}'
mod = importlib.import_module(fullmodname)
fns = [getattr(mod,n) for n in name.split(',')]
return fns[0] if len(fns)==1 else fns
[docs]
def check_compatibility(self, sim):
def check_choices(val, choices):
if not isinstance(val, choices):
raise NotImplementedError("Unsupported simulator component. Given: {}\nExpected one of: {}".format(val, choices))
# monitors
if len(sim.monitors) > 1:
raise NotImplementedError("Configure with one monitor.")
check_choices(sim.monitors[0], (monitors.Raw, monitors.TemporalAverage))
# integrators
check_choices(sim.integrator, integrators.HeunStochastic)
# models
check_choices(sim.model, models.MontbrioPazoRoxin)
# coupling
check_choices(sim.coupling, coupling.Linear)
# surface
if sim.surface is not None:
raise NotImplementedError("Surface simulation not supported.")
# stimulus evaluated outside the backend, no restrictions
[docs]
def run_sim(self, sim, nstep=None, simulation_length=None, chunksize=100000, compatibility_mode=False, print_source=False):
assert nstep is not None or simulation_length is not None or sim.simulation_length is not None
self.check_compatibility(sim)
if nstep is None:
if simulation_length is None:
simulation_length = sim.simulation_length
nstep = int(np.ceil(simulation_length/sim.integrator.dt))
if isinstance(sim.monitors[0], monitors.Raw):
svar_bufs = self._run_sim_plain(sim, nstep, compatibility_mode=compatibility_mode, print_source=print_source)
time = np.arange(svar_bufs[0].shape[1]) * sim.integrator.dt
elif isinstance(sim.monitors[0], monitors.TemporalAverage):
svar_bufs = self._run_sim_tavg_chunked(sim, nstep, chunksize=chunksize, compatibility_mode=compatibility_mode, print_source=print_source)
T = sim.monitors[0].period
time = np.arange(svar_bufs[0].shape[1]) * T + 0.5 * T
else:
raise NotImplementedError("Only Raw or TemporalAverage monitors supported.")
data = np.concatenate(
[svar_buf.T[:,np.newaxis,:, np.newaxis] for svar_buf in svar_bufs],
axis=1
)
return (time, data),
def _run_sim_plain(self, sim, nstep=None, compatibility_mode=False, print_source=True):
template = '<%include file="nb-montbrio.py.mako"/>'
content = dict(
compatibility_mode=compatibility_mode,
sim=sim
)
integrate = self.build_py_func(template, content, name='integrate', print_source=print_source)
horizon = sim.connectivity.horizon
buf_len = horizon + nstep
N = sim.connectivity.number_of_regions
gf = sim.integrator.noise.gfun(None)
svar_bufs = [buf for buf in sim.integrator.noise.generate( shape=(sim.model.nvar,N,buf_len) ) * gf]
for i, svar_buf in enumerate(svar_bufs):
svar_buf[:,:horizon] = np.roll(sim.history.buffer[:,i,:,0], -1, axis=0).T
if sim.stimulus is None:
stimulus = None
else:
sim.stimulus.configure_space()
sim.stimulus.configure_time(np.arange(nstep)*sim.integrator.dt)
stimulus = sim.stimulus()
svar_bufs = self._run_integrate( integrate, sim, N, nstep, svar_bufs, stimulus)
return [svar_buf[:,horizon:] for svar_buf in svar_bufs]
def _run_integrate(self, integrate, sim, N, nstep, svar_bufs, stimulus):
return integrate(
N = N,
dt = sim.integrator.dt,
nstep = nstep,
i0 = sim.connectivity.horizon,
**dict(zip(sim.model.state_variables,svar_bufs)),
weights = sim.connectivity.weights,
idelays = sim.connectivity.idelays,
parmat = sim.model.spatial_parameter_matrix.T,
stimulus = stimulus
)
def _time_average(self, ts, istep):
N, T = ts.shape
return np.mean(ts.reshape(N,T//istep,istep),-1) # length of ts better be multiple of istep
def _run_sim_tavg_chunked(self, sim, nstep, chunksize, compatibility_mode=False, print_source=False):
template = '<%include file="nb-montbrio.py.mako"/>'
content = dict(sim=sim, compatibility_mode=compatibility_mode)
integrate = self.build_py_func(template, content, name='integrate', print_source=print_source)
# chunksize in number of steps
horizon = sim.connectivity.horizon
N = sim.connectivity.number_of_regions
gf = sim.integrator.noise.gfun(None)
tavg_steps=sim.monitors[0].istep
assert tavg_steps < chunksize
assert chunksize % tavg_steps == 0
tavg_chunksize = chunksize // tavg_steps
assert nstep % tavg_steps == 0
svar_outs = [svar_out for svar_out in np.zeros((sim.model.nvar,N,nstep//tavg_steps))]
svar_bufs = [buf for buf in sim.integrator.noise.generate( shape=(sim.model.nvar,N,chunksize+horizon) ) * gf]
for i, svar_buf in enumerate(svar_bufs):
svar_buf[:,:horizon] = np.roll(sim.history.buffer[:,i,:,0], -1, axis=0).T
for chunk, _ in enumerate(range(horizon, nstep+horizon, chunksize)):
if sim.stimulus is None:
stimulus = None
else:
sim.stimulus.configure_space()
sim.stimulus.configure_time(
np.arange(chunk*chunksize, (chunk+1)*chunksize)*sim.integrator.dt
)
stimulus = sim.stimulus()
svar_bufs = self._run_integrate( integrate, sim, N, chunksize, svar_bufs, stimulus)
tavg_chunk = chunk * tavg_chunksize
svar_chunks = [self._time_average(svar[:, horizon:], tavg_steps) for svar in svar_bufs]
if tavg_chunk+tavg_chunksize > svar_outs[0].shape[1]:
cutoff = tavg_chunk+tavg_chunksize - svar_outs[0].shape[1]
for svar_chunk in svar_chunks:
svar_chunk = svar_chunk[:,:-cutoff]
for svar_out, svar_chunk in zip (svar_outs, svar_chunks):
svar_out[:,tavg_chunk:tavg_chunk+tavg_chunksize] = svar_chunk
for svar_buf in svar_bufs:
svar_buf[:,:horizon] = svar_buf[:,-horizon:]
for svar_buf, svar_noise in zip(svar_bufs, sim.integrator.noise.generate( shape=(sim.model.nvar,N,chunksize) ) * gf):
svar_buf[:,horizon:] = svar_noise
return svar_outs