From 9ad805206b680d67ef5dd9b01bf19adcd899793f Mon Sep 17 00:00:00 2001 From: Wolfgang Ludger Hottgenroth Date: Wed, 10 Nov 2021 18:01:18 +0100 Subject: [PATCH] first success with database --- .gitignore | 1 + src/AbstractSinkHandler.py | 7 ++++++ src/AbstractTsDbMixin.py | 37 ++++++++++++++++++++++++++++ src/EGM1mdTsDbMixin.py | 43 +++++++++++++++++++++++++++++++++ src/EGM1mdTsDbSinkHandler.py | 20 +++++++++++++++ src/config/iiotFeederConfig.ini | 13 ++++++++-- src/dataformat.txt | 10 +++++--- src/schema.sql | 10 ++++++++ 8 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 src/AbstractTsDbMixin.py create mode 100644 src/EGM1mdTsDbMixin.py create mode 100644 src/EGM1mdTsDbSinkHandler.py create mode 100644 src/schema.sql diff --git a/.gitignore b/.gitignore index c18dd8d..cc6673d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__/ +config/ diff --git a/src/AbstractSinkHandler.py b/src/AbstractSinkHandler.py index 594dbac..970acf7 100644 --- a/src/AbstractSinkHandler.py +++ b/src/AbstractSinkHandler.py @@ -9,6 +9,7 @@ POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None) class AbstractSinkHandler(threading.Thread): def __init__(self, config, name, inQueue, experiment): super().__init__() + logger.info("constructor called") self.config = config self.localConfig = self.config[name] @@ -16,6 +17,7 @@ class AbstractSinkHandler(threading.Thread): self.inQueue = inQueue self.experiment = experiment + def run(self): logger.debug("loop started") @@ -27,10 +29,15 @@ class AbstractSinkHandler(threading.Thread): logger.debug("swallowed the poison pill") break self.sinkAction(dataObject) + + self.deinit() def init(self): logger.debug("no initialization needed") + def deinit(self): + logger.debug("no deinitialization needed") + def sinkAction(self, dataObject): raise NotImplementedError() diff --git a/src/AbstractTsDbMixin.py b/src/AbstractTsDbMixin.py new file mode 100644 index 0000000..0e11ff5 --- /dev/null +++ b/src/AbstractTsDbMixin.py @@ -0,0 +1,37 @@ +from loguru import logger +import psycopg2 +import psycopg2.extras +import os + +class AbstractTsDbMixin(object): + def __init__(self, config, name, inQueue, experiment): + logger.info("constructor called") + self.config = config + self.name = name + self.connection = None + + def connectDb(self): + dbHost = self.config["database"]["host"] + dbUser = self.config["database"]["user"] + dbPassword = self.config["database"]["password"] + if dbPassword == "$ENV$": + dbPassword = os.environ["DB_PASSWORD"] + dbName = self.config["database"]["name"] + + logger.info(f"Connect to database for {self.name}") + connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require" + if dbPassword != "$NONE$": + connInfo += f" password={dbPassword}" + self.connection = psycopg2.connect(connInfo) + self.connection.autocommit = False + logger.info("Connection to database established") + + def disconnectDb(self): + logger.info(f"Disonnect from database for {self.name}") + if self.connection: + self.connection.close() + + def write(self, experiment, dataObject): + raise NotImplementedError() + + diff --git a/src/EGM1mdTsDbMixin.py b/src/EGM1mdTsDbMixin.py new file mode 100644 index 0000000..7d34a79 --- /dev/null +++ b/src/EGM1mdTsDbMixin.py @@ -0,0 +1,43 @@ +from AbstractTsDbMixin import AbstractTsDbMixin +from loguru import logger +import iso8601 +import datetime +from io import StringIO + +class EGM1mdTsDbMixin(AbstractTsDbMixin): + def write(self, experiment, dataObject): + logger.info(f"Db write: {experiment} {dataObject}") + try: + ts = iso8601.parse_date(dataObject.payload["ts"]) + dt = datetime.timedelta(microseconds=dataObject.payload["dt"]) + values = dataObject.payload["v"] + logger.info(f"ts: {ts}") + logger.info(f"dt: {dt}") + logger.info(f"v: {values}") + + stringBuffer = '' + for rv in values: + statusOctet = (rv & 0xff000000) >> 24 + unsignedValue = rv & 0x00ffffff + valid = (statusOctet & 0x04) >> 2 + sign = (statusOctet & 0x02) >> 1 + signedValue = unsignedValue if not sign else unsignedValue * -1 + + logger.info(f"store: ts: {ts}, rv: {rv:08x}, status: {statusOctet:02x}, valid: {valid}, sign: {sign}, unsigned: {unsignedValue}, signed: {signedValue}") + + stringBuffer += f"{ts}\t{experiment}\t{rv}\t{valid}\t{signedValue}\n" + ts += dt + + stringIO = StringIO(stringBuffer) + + logger.info("database copy started") + with self.connection: + with self.connection.cursor() as cursor: + cursor.copy_from(stringIO, 'egm1md', columns=('time', 'experiment', 'rawvalue', 'valid', 'value')) + logger.info("database copy done") + + + except iso8601.ParseError as e: + logger.error(f"unable to parse timestamp, item dropped: {f}") + except KeyError as e: + logger.error(f"missing attribute in payload, item dropped: {e}") diff --git a/src/EGM1mdTsDbSinkHandler.py b/src/EGM1mdTsDbSinkHandler.py new file mode 100644 index 0000000..e3afb0d --- /dev/null +++ b/src/EGM1mdTsDbSinkHandler.py @@ -0,0 +1,20 @@ +from loguru import logger +from AbstractSinkHandler import AbstractSinkHandler +from EGM1mdTsDbMixin import EGM1mdTsDbMixin + +class EGM1mdTsDbSinkHandler(AbstractSinkHandler, EGM1mdTsDbMixin): + def __init__(self, config, name, inQueue, experiment): + super().__init__(config, name, inQueue, experiment) + logger.info("constructor called") + + def init(self): + logger.info("init") + self.connectDb() + + def deinit(self): + logger.info("deinit") + self.disconnectDb() + + def sinkAction(self, dataObject): + logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}") + self.write(self.experiment, dataObject) \ No newline at end of file diff --git a/src/config/iiotFeederConfig.ini b/src/config/iiotFeederConfig.ini index 269dc6e..c1bc3c4 100644 --- a/src/config/iiotFeederConfig.ini +++ b/src/config/iiotFeederConfig.ini @@ -3,6 +3,15 @@ login= password= broker=172.16.2.16 +[database] +host=172.16.10.27 +user=iiot +# set password to $ENV$ to load it from environment +# set password to $NONE$ to not provide a password +# but for instance use a client certificate +password=8NP2M3CHm7EQhYiY6aGggTWsBDBECAkr +name=iiotfeed + [master] topic=rd/set01/rig01/cmd dataObjectName=cmd @@ -11,12 +20,12 @@ slaves=dev05 dev06 rig01 [dev05] topic=rd/set01/rig01/dev05/md dataObjectName=value -sinkHandler=DummySinkHandler +sinkHandler=EGM1mdTsDbSinkHandler [dev06] topic=rd/set01/rig01/dev06/md dataObjectName=value -sinkHandler=DummySinkHandler +sinkHandler=EGM1mdTsDbSinkHandler [rig01] topic=rd/set01/rig01/log diff --git a/src/dataformat.txt b/src/dataformat.txt index 821e92b..698ea32 100644 --- a/src/dataformat.txt +++ b/src/dataformat.txt @@ -1,3 +1,7 @@ +// timestamps must be given in RFC3339 format, timezone UTC +// timedeltas must be given in integer microseconds + + // MQTT command data package of the rig-PC. // Topic: rd/set01/rig01/cmd { @@ -79,7 +83,7 @@ { "ts": "time of first sample", "dt": "sampling time" - "values": [ + "v": [ 0x00000000, // 1st value (0x00FFFFFF -> 24 bit raw data mask, 0x02000000 -> mag. field polarity mask (1 = +1, 0 = -1) , 0x04000000 -> data validation mask (1 = valid, 0 = invalid)) 0x00000000, // 2nd value "..." @@ -93,7 +97,7 @@ { "ts": "time of first sample", "dt": "sampling time" - "values": [ + "v": [ 0x00000000, // 1st value (0x00FFFFFF -> 24 bit raw data mask, 0x02000000 -> mag. field polarity mask (1 = +1, 0 = -1) , 0x04000000 -> data validation mask (1 = valid, 0 = invalid)) 0x00000000, // 2nd value "..." @@ -106,7 +110,7 @@ // Topic: rd/set02/dev05/kir/md { "ts": "time of first sample", - "values": [ + "v": [ [ 0x00000000, 0x00000000 ], // real, imagine [ 0x00000000, 0x00000000 ], "..." diff --git a/src/schema.sql b/src/schema.sql new file mode 100644 index 0000000..5957aaf --- /dev/null +++ b/src/schema.sql @@ -0,0 +1,10 @@ +create table egm1md ( + time timestamp without time zone not null, + experiment varchar(64) not null, + rawvalue bigint not null, + valid boolean not null, + value integer not null +); + +select create_hypertable('egm1md', 'time'); +