Source code for tvb.interfaces.rest.bids_monitor.bids_dir_monitor

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

import time
import os
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from tvb.interfaces.rest.bids_monitor.bids_data_builder import BIDSDataBuilder
from tvb.interfaces.rest.client.examples.utils import monitor_operation, compute_rest_url
from tvb.interfaces.rest.client.tvb_client import TVBClient
from tvb.adapters.uploaders.bids_importer import BIDSImporter, BIDSImporterModel

from threading import Thread

from tvb.basic.logger.builder import get_logger

logger = get_logger(__name__)

SUBJECT_PREFIX = 'sub'


[docs] class BIDSDirWatcher: def __init__(self, DIRECTORY_TO_WATCH=None, UPLOAD_TRIGGER_INTERVAL=20, IMPORT_DATA_IN_TVB=False, TVB_PROJECT_ID=None): self.added_files = [] self.DIRECTORY_TO_WATCH = DIRECTORY_TO_WATCH self.UPLOAD_TRIGGER_INTERVAL = UPLOAD_TRIGGER_INTERVAL self.IMPORT_DATA_IN_TVB = IMPORT_DATA_IN_TVB self.TVB_PROJECT_ID = TVB_PROJECT_ID self.end_watcher_flag = False self.current_dataset_loc = ""
[docs] def check_data(self): if self.DIRECTORY_TO_WATCH is None: logger.info("Provided directory to monitor is None") return False
[docs] def watchdog_thread(self): patterns = ["*.json"] ignore_directories = True case_sensitive = True my_event_handler = PatternMatchingEventHandler( patterns=patterns, ignore_directories=ignore_directories, case_sensitive=case_sensitive) my_event_handler.on_created = self.handle_file_added go_recursively = True my_observer = Observer() my_observer.schedule( my_event_handler, self.DIRECTORY_TO_WATCH, recursive=go_recursively) my_observer.start() try: while True: if self.end_watcher_flag: break time.sleep(5) except KeyboardInterrupt as e: my_observer.stop() my_observer.join()
[docs] def uploader_thread(self): try: while True: if self.end_watcher_flag: break time.sleep(self.UPLOAD_TRIGGER_INTERVAL) # uploading files currently present in the queue if len(self.added_files) == 0: continue self.create_bids_dataset(added_files=self.added_files[:]) # emptying the queue after uploading them self.added_files = [] except KeyboardInterrupt as e: return
[docs] def init_watcher(self): if self.check_data() is False: return watchdog_thread = Thread(target=self.watchdog_thread) upload_file_thread = Thread(target=self.uploader_thread) logger.info("Starting watchdog thread...") watchdog_thread.start() logger.info("Starting file uploader thread...") upload_file_thread.start() if self.IMPORT_DATA_IN_TVB: logger.info( "Performing TVB browser login for importing files in TVB") self.tvb_client = TVBClient(compute_rest_url()) self.tvb_client.browser_login() logger.info("Login Done")
[docs] def end_watcher(self): self.end_watcher_flag = True
[docs] def handle_file_added(self, event): self.added_files.append(os.path.normpath(event.src_path)) logger.info("New file found {}, current file queue length {}".format( event.src_path, len(self.added_files)))
[docs] def create_bids_dataset(self, added_files): logger.info( "Creating BIDS dataset, with {} intial json files".format(len(added_files))) added_files = set(added_files) uploading_files = [] for path in added_files: if self.change_outside_sub_dir(path): continue uploading_files.append(path) if len(uploading_files) == 0: logger.info("No files are added inside subject folder") return subs_divided_paths = {} bids_dir_name = os.path.split( os.path.normpath(self.DIRECTORY_TO_WATCH))[1] for f in uploading_files: path_ar = f.split(bids_dir_name)[1].split(os.sep) for i, j in enumerate(path_ar): if j.startswith(SUBJECT_PREFIX): if subs_divided_paths.get(j) is None: subs_divided_paths[j] = [f] else: subs_divided_paths[j].append(f) break logger.info("Running BIDSDataBuilder on these files...") try: bids_data_builder = BIDSDataBuilder( bids_root_dir=self.DIRECTORY_TO_WATCH, init_json_files=subs_divided_paths) bids_zip_file = bids_data_builder.create_dataset_json_files() logger.info("Successfully built BIDS dataset") logger.info("ZIP file location: {}".format(bids_zip_file)) self.current_dataset_loc = bids_zip_file except Exception as e: logger.error( "Exception occurred while creating BIDS dataset {}".format(e.__class__)) logger.error("Unable to create BIDS dataset for these files") return if self.IMPORT_DATA_IN_TVB: logger.info("Now, importing data into TVB") self.upload_to_tvb(bids_zip_file)
[docs] def upload_to_tvb(self, file_path): projects_of_user = self.tvb_client.get_project_list() logger.info("Found {} porjects".format(len(projects_of_user))) if len(projects_of_user) == 0: return if self.TVB_PROJECT_ID is None: logger.info( "Importing data into first project as provided project id is None") self.TVB_PROJECT_ID = projects_of_user[0].gid logger.info("Project Name: {}".format(projects_of_user[0].name)) try: model = BIDSImporterModel() model.uploaded = file_path operation_gid = self.tvb_client.launch_operation( self.TVB_PROJECT_ID, BIDSImporter, model) monitor_operation(self.tvb_client, operation_gid) logger.info("Getting results of the import") res = self.tvb_client.get_operation_results(operation_gid) if len(res) == 0: logger.info( "Import was unsuccessful, no results found on operations") else: logger.info("Successfully imported the data into the TVB project") except Exception as e: logger.error( "Error importing data into TVB project, Exception {}".format(e.__class__))
[docs] def change_outside_sub_dir(self, file_path): bids_dir_name = os.path.split( os.path.normpath(self.DIRECTORY_TO_WATCH))[1] path_ar = file_path.split(bids_dir_name) sub_dir = path_ar[1] sub_name = os.path.split(sub_dir)[0] try: if SUBJECT_PREFIX in sub_name: sub_dir_ar = sub_dir.split(os.sep) for i, j in enumerate(sub_dir_ar): if j.startswith(SUBJECT_PREFIX): if sub_dir_ar[i+1] in ['ts', 'net', 'coord', 'spatial']: return False break except Exception as e: logger.error( "Exception: {} occurred in checking if added file is in sub directory".format(e.__class__)) return True