Source code for

# -*- coding: utf-8 -*-
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <>.
# When using The Virtual Brain for scientific publications, please cite it as explained here:

.. moduleauthor:: Paula Popa <>
.. moduleauthor:: Bogdan Valean <>

import os
from functools import partial
from tvb.basic.logger.builder import get_logger
from tvb.core.entities.model.model_operation import STATUS_ERROR, STATUS_CANCELED, STATUS_FINISHED
from tvb.core.entities.model.model_operation import STATUS_STARTED, STATUS_PENDING
from import dao
from import HPCSchedulerClient, HPCJobStatus
from import OperationException
from import StorageInterface

    from pyunicore.client import Job, Transport
except ImportError:
    from tvb.basic.config.settings import HPCSettings
    HPCSettings.CAN_RUN_HPC = False

[docs]class HPCOperationService(object): LOGGER = get_logger(__name__) @staticmethod def _operation_error(operation): operation.mark_complete(STATUS_ERROR) dao.store_entity(operation) @staticmethod def _operation_canceled(operation): operation.mark_complete(STATUS_CANCELED) dao.store_entity(operation) @staticmethod def _operation_started(operation): operation.start_now() dao.store_entity(operation) @staticmethod def _operation_finished(operation, simulator_gid): op_ident = dao.get_operation_process_for_operation( # TODO: Handle login job = Job(Transport(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY]), op_ident.job_id) operation = dao.get_operation_by_id( folder = HPCSchedulerClient.storage_interface.get_project_folder( storage_interface = StorageInterface() if storage_interface.encryption_enabled(): storage_interface.inc_project_usage_count(folder) storage_interface.sync_folders(folder) try: sim_h5_filenames, metric_op, metric_h5_filename = \ HPCSchedulerClient.stage_out_to_operation_folder(job.working_dir, operation, simulator_gid) operation.mark_complete(STATUS_FINISHED) dao.store_entity(operation) HPCSchedulerClient().update_db_with_results(operation, sim_h5_filenames, metric_op, metric_h5_filename) except OperationException as exception: HPCOperationService.LOGGER.error(exception) HPCOperationService._operation_error(operation) finally: if storage_interface.encryption_enabled(): storage_interface.sync_folders(folder) storage_interface.set_project_inactive(operation.project)
[docs] @staticmethod def handle_hpc_status_changed(operation, simulator_gid, new_status): # type: (Operation, str, str) -> None switcher = { STATUS_ERROR: HPCOperationService._operation_error, STATUS_CANCELED: HPCOperationService._operation_canceled, STATUS_STARTED: HPCOperationService._operation_started, STATUS_FINISHED: partial(HPCOperationService._operation_finished, simulator_gid=simulator_gid) } update_func = switcher.get(new_status, lambda: "Invalid operation status") update_func(operation)
[docs] @staticmethod def check_operations_job(): operations = dao.get_operations_for_hpc_job() if operations is None or len(operations) == 0: return for operation in operations:"Start processing operation {}".format( try: op_ident = dao.get_operation_process_for_operation( if op_ident is not None: transport = Transport(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY]) job = Job(transport, op_ident.job_id) job_status =['status'] if job.is_running(): if operation.status == STATUS_PENDING and job_status == HPCJobStatus.READY.value: HPCOperationService._operation_started(operation) "CSCS job status: {} for operation {}.".format(job_status, return "Job for operation {} has status {}".format(, job_status)) if job_status == HPCJobStatus.SUCCESSFUL.value: simulator_gid = operation.view_model_gid HPCOperationService._operation_finished(operation, simulator_gid) else: HPCOperationService._operation_error(operation) except Exception: HPCOperationService.LOGGER.error( "There was an error on background processing process for operation {}".format(, exc_info=True)