Source code for tardis.plugins.sqliteregistry

from tardis.utilities.attributedict import AttributeDict
from ..configuration.configuration import Configuration
from ..interfaces.plugin import Plugin
from ..interfaces.state import State

from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import List, Dict, Generator
import asyncio
import logging
import sqlite3

logger = logging.getLogger("cobald.runtime.tardis.plugins.sqliteregistry")


[docs]class SqliteRegistry(Plugin): thread_pool_executor = ThreadPoolExecutor(max_workers=1) def __init__(self): """ The :py:class:`~tardis.plugins.sqliteregistry.SqliteRegistry` implements a persistent storage of all Drone states in a SQLite database. The usage of this module is recommended in order to recover the last state of TARDIS in case the service has to be restarted. """ configuration = Configuration() self._db_file = configuration.Plugins.SqliteRegistry.db_file self._deploy_db_schema() self._dispatch_on_state = dict( RequestState=self.insert_resource, DownState=self.delete_resource ) for site in configuration.Sites: self.add_site(site.name) for machine_type in getattr(configuration, site.name).MachineTypes: self.add_machine_types(site.name, machine_type)
[docs] def add_machine_types(self, site_name: str, machine_type: str) -> None: if self._get_machine_type(site_name, machine_type): logger.debug( f"{machine_type} is already present for {site_name} in database! Skipping insertion!" # noqa B950 ) return sql_query = """ INSERT OR ROLLBACK INTO MachineTypes(machine_type, site_id) SELECT :machine_type, Sites.site_id FROM Sites WHERE Sites.site_name = :site_name""" self.execute(sql_query, {"site_name": site_name, "machine_type": machine_type})
def _get_machine_type(self, site_name: str, machine_type: str) -> List[Dict]: sql_query = """ SELECT * FROM MachineTypes MT JOIN Sites S ON MT.site_id = S.site_id WHERE MT.machine_type = :machine_type AND S.site_name = :site_name""" return self.execute( sql_query, {"site_name": site_name, "machine_type": machine_type} )
[docs] def add_site(self, site_name: str) -> None: if self._get_site(site_name): logger.debug( f"{site_name} already present in database! Skipping insertion!" ) return sql_query = "INSERT OR ROLLBACK INTO Sites(site_name) VALUES (:site_name)" self.execute(sql_query, {"site_name": site_name})
def _get_site(self, site_name: str) -> List[Dict]: sql_query = "SELECT * FROM Sites WHERE site_name = :site_name" return self.execute(sql_query, {"site_name": site_name})
[docs] async def async_execute(self, sql_query: str, bind_parameters: Dict) -> List[Dict]: loop = asyncio.get_event_loop() return await loop.run_in_executor( self.thread_pool_executor, self.execute, sql_query, bind_parameters )
[docs] @contextmanager def connect(self) -> Generator[sqlite3.Connection, None, None]: con = sqlite3.connect( self._db_file, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES ) try: with con: # context manager to commit or rollback transactions yield con finally: con.close()
def _deploy_db_schema(self) -> None: tables = { "MachineTypes": [ "machine_type_id INTEGER PRIMARY KEY AUTOINCREMENT", "machine_type VARCHAR(255)", "site_id INTEGER", "FOREIGN KEY(site_id) REFERENCES Sites(site_id)", "CONSTRAINT unique_machine_type_per_site UNIQUE (machine_type, site_id)", # noqa B950 ], "Resources": [ "id INTEGER PRIMARY KEY AUTOINCREMENT", "remote_resource_uuid VARCHAR(255)", "drone_uuid VARCHAR(255) UNIQUE", "state_id INTEGER", "site_id INTEGER", "machine_type_id INTEGER", "created TIMESTAMP", "updated TIMESTAMP", "FOREIGN KEY(state_id) REFERENCES ResourceState(state_id)", "FOREIGN KEY(site_id) REFERENCES Sites(site_id)", "FOREIGN KEY(machine_type_id) REFERENCES MachineTypes(machine_type_id)", "CONSTRAINT unique_remote_resource_uuid_per_site UNIQUE (site_id, remote_resource_uuid)", # noqa B950 ], "ResourceStates": [ "state_id INTEGER PRIMARY KEY AUTOINCREMENT", "state VARCHAR(255) UNIQUE", ], "Sites": [ "site_id INTEGER PRIMARY KEY AUTOINCREMENT", "site_name VARCHAR(255) UNIQUE", ], } with self.connect() as connection: cursor = connection.cursor() cursor.execute("PRAGMA foreign_keys = ON") cursor.execute("PRAGMA locking_mode = EXCLUSIVE") cursor.execute("PRAGMA journal_mode = WAL") for table_name, columns in tables.items(): cursor.execute( f"create table if not exists {table_name} ({', '.join(columns)})" # noqa b950 ) for state in State.get_all_states(): cursor.execute( "INSERT OR IGNORE INTO ResourceStates(state) VALUES (?)", (state,), ) cursor.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_drone_uuid ON Resources (drone_uuid);" # noqa B950 )
[docs] async def delete_resource(self, bind_parameters: dict): sql_query = """DELETE FROM Resources WHERE drone_uuid = :drone_uuid AND site_id = (SELECT site_id from Sites WHERE site_name = :site_name)""" await self.async_execute(sql_query, bind_parameters)
[docs] def execute(self, sql_query: str, bind_parameters: Dict) -> List[Dict]: with self.connect() as connection: connection.row_factory = lambda cur, row: { col[0]: row[idx] for idx, col in enumerate(cur.description) } cursor = connection.cursor() cursor.execute(sql_query, bind_parameters) logger.debug(f"{sql_query},{bind_parameters} executed") return cursor.fetchall()
[docs] async def get_resource_state(self, drone_uuid: str): sql_query = """ SELECT RS.state FROM Resources R JOIN ResourceStates RS ON R.state_id = RS.state_id WHERE R.drone_uuid = :drone_uuid """ return await self.async_execute(sql_query, {"drone_uuid": drone_uuid})
[docs] def get_resources(self, site_name: str, machine_type: str) -> List[Dict]: sql_query = """ SELECT R.remote_resource_uuid, R.drone_uuid, RS.state, R.created, R.updated FROM Resources R JOIN ResourceStates RS ON R.state_id = RS.state_id JOIN Sites S ON R.site_id = S.site_id JOIN MachineTypes MT ON R.machine_type_id = MT.machine_type_id WHERE S.site_name = :site_name AND MT.machine_type = :machine_type""" return self.execute( sql_query, {"site_name": site_name, "machine_type": machine_type} )
[docs] async def insert_resource(self, bind_parameters: Dict) -> None: sql_query = """ INSERT OR ROLLBACK INTO Resources(remote_resource_uuid, drone_uuid, state_id, site_id, machine_type_id, created, updated) SELECT :remote_resource_uuid, :drone_uuid, RS.state_id, S.site_id, MT.machine_type_id, :created, :updated FROM ResourceStates RS JOIN Sites S ON S.site_name = :site_name JOIN MachineTypes MT ON MT.machine_type = :machine_type AND MT.site_id = S.site_id WHERE RS.state = :state""" await self.async_execute(sql_query, bind_parameters)
[docs] async def notify(self, state: State, resource_attributes: AttributeDict) -> None: state = str(state) logger.debug(f"Drone: {str(resource_attributes)} has changed state to {state}") bind_parameters = {"state": state} bind_parameters.update(resource_attributes) await self._dispatch_on_state.get(state, self.update_resource)(bind_parameters)
[docs] async def update_resource(self, bind_parameters: Dict) -> None: sql_query = """UPDATE Resources SET updated = :updated, remote_resource_uuid = :remote_resource_uuid, state_id = (SELECT state_id FROM ResourceStates WHERE state = :state) WHERE drone_uuid = :drone_uuid AND site_id = (SELECT site_id FROM Sites WHERE site_name = :site_name)""" await self.async_execute(sql_query, bind_parameters)