import os
import shutil

import tvb.config.init.alembic.versions as scripts
from alembic import command
from alembic.config import Config
from sqlalchemy import inspect
from sqlalchemy.sql import text
from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.core.neotraits.db import Base

LOGGER = get_logger(__name__)

[docs]def initialize_startup(): """ Force DB tables create, in case no data is already found.""" is_db_empty = False session = SA_SESSIONMAKER() inspector = inspect(session.connection()) table_names = inspector.get_table_names() if len(table_names) < 1: LOGGER.debug("Database access exception, maybe DB is empty") is_db_empty = True session.close() versions_repo = TvbProfile.current.db.DB_VERSIONING_REPO alembic_cfg = Config() alembic_cfg.set_main_option('script_location', versions_repo) alembic_cfg.set_main_option('sqlalchemy.url', TvbProfile.current.db.DB_URL) if is_db_empty:"Initializing Database") if os.path.exists(versions_repo): shutil.rmtree(versions_repo) _update_sql_scripts() session = SA_SESSIONMAKER() Base.metadata.create_all(bind=session.connection()) session.commit() session.close() command.stamp(alembic_cfg, 'head')"Database Default Tables created successfully!") else: _update_sql_scripts() if 'migrate_version' in table_names: db_version = session.execute(text("""SELECT version from migrate_version""")).fetchone()[0] if db_version == 18: command.stamp(alembic_cfg, 'head') session.execute(text("""DROP TABLE "migrate_version";""")) session.commit() return is_db_empty if 'alembic_version' in table_names: db_version = session.execute(text("""SELECT version_num from alembic_version""")).fetchone() if not db_version: command.stamp(alembic_cfg, 'head') with session.connection() as connection: alembic_cfg.attributes['connection'] = connection command.upgrade(alembic_cfg, TvbProfile.current.version.DB_STRUCTURE_VERSION)"Database already has some data, will not be re-created!") # Alembic throws an error if we use alembic_cfg with a connection that has been already closed del alembic_cfg.attributes['connection'] command.stamp(alembic_cfg, 'head') return is_db_empty
[docs]def reset_database(): """ Remove all tables in DB. """ LOGGER.warning("Your Database tables will be deleted.") try: session = SA_SESSIONMAKER() LOGGER.debug("Delete connection initiated.") inspector = inspect(session.connection()) for table in inspector.get_table_names(): try: LOGGER.debug("Removing:" + table) session.execute(text("DROP TABLE \"%s\" CASCADE" % table)) except Exception: try: session.execute(text("DROP TABLE %s" % table)) except Exception as excep1: LOGGER.error("Could no drop table %s", table) LOGGER.exception(excep1) session.commit()"Database was cleanup!") except Exception as excep: LOGGER.warning(excep) finally: session.close()
def _update_sql_scripts(): """ When a new release is done, make sure old DB scripts are updated. """ scripts_folder = os.path.dirname(scripts.__file__) versions_folder = os.path.join(TvbProfile.current.db.DB_VERSIONING_REPO, 'versions') if os.path.exists(versions_folder): shutil.rmtree(versions_folder) ignore_patters = shutil.ignore_patterns('.svn') shutil.copytree(scripts_folder, versions_folder, ignore=ignore_patters) shutil.copyfile(os.path.join(os.path.dirname(__file__), 'alembic/'), os.path.join(versions_folder, os.pardir, ''))