add rig database storing

This commit is contained in:
Wolfgang Hottgenroth 2021-11-11 13:35:01 +01:00
parent 791a04c388
commit b6eaafca8e
Signed by: wn
GPG Key ID: 6C1E5E531E0D5D7F
11 changed files with 226 additions and 87 deletions

View File

@ -24,11 +24,16 @@ class AbstractSinkHandler(threading.Thread):
self.init() self.init()
while True: while True:
try:
dataObject = self.inQueue.get() dataObject = self.inQueue.get()
if (dataObject == POISON_PILL): if (dataObject == POISON_PILL):
logger.debug("swallowed the poison pill") logger.debug("swallowed the poison pill")
break break
self.sinkAction(dataObject) self.sinkAction(dataObject)
except Exception as e:
logger.error(f"unexpected error, items dropped: {type(e)}, {e}")
raise e
self.deinit() self.deinit()

View File

@ -0,0 +1,45 @@
from io import StringIO
from loguru import logger
import psycopg2
from AbstractSinkHandler import AbstractSinkHandler
from PgDbHandle import PgDbHandle
class TsDbSinkException(Exception):
def __init__(self, code, msg):
super().__init__()
self.code = code
self.msg = msg
class AbstractTsDbSinkHandler(AbstractSinkHandler):
def __init__(self, config, name, inQueue, experiment):
super().__init__(config, name, inQueue, experiment)
logger.info("constructor called")
self.dbh = PgDbHandle(config)
def init(self):
logger.info("init")
self.dbh.connectDb()
def deinit(self):
logger.info("deinit")
self.dbh.disconnectDb()
def copy(self, table, columns, data):
try:
stringIO = StringIO(data)
with self.dbh.connection as conn:
with conn.cursor() as cursor:
cursor.copy_from(stringIO, table, columns=columns)
except psycopg2.Error as e:
raise TsDbSinkException(e.pgcode, str(e))
def insert(self, table, columns, data):
placeholders = ['%s'] * len(columns)
stmt = f"insert into {table} ({','.join(columns)}) values ({','.join(placeholders)})"
logger.debug(f"{stmt=}")
try:
with self.dbh.connection as conn:
with conn.cursor() as cursor:
cursor.execute(stmt, data)
except psycopg2.Error as e:
raise TsDbSinkException(e.pgcode, str(e))

View File

@ -33,13 +33,13 @@ def sendLog(client):
client.publish(topic, json.dumps(msg)) client.publish(topic, json.dumps(msg))
logger.debug(topic) logger.debug(topic)
def sendValue(client): def sendValue(client, device):
msg = { msg = {
"ts": str(datetime.now()), "ts": str(datetime.now()),
"dt": 100, "dt": 100,
"v": sample(range(0xffffffff), 3000) "v": sample(range(0xffffffff), 3000)
} }
topic = f"{TOPIC_PRE}/rig01/dev05/md" topic = f"{TOPIC_PRE}/rig01/{device}/md"
client.publish(topic, json.dumps(msg)) client.publish(topic, json.dumps(msg))
logger.debug(topic) logger.debug(topic)
@ -49,7 +49,9 @@ def perform(client, params):
sendCmd(client, "start") sendCmd(client, "start")
for i in range(10): for i in range(10):
sendValue(client) sendValue(client, 'dev05')
sendValue(client, 'dev06')
sendLog(client)
sleep(1.0) sleep(1.0)
logger.info("Stop") logger.info("Stop")

View File

@ -1,45 +0,0 @@
import psycopg2
from AbstractTsDbMixin import AbstractTsDbMixin
from loguru import logger
import iso8601
import datetime
from io import StringIO
class EGM1mdTsDbMixin(AbstractTsDbMixin):
def write(self, experiment, dataObject):
logger.info("database write operation start")
try:
ts = iso8601.parse_date(dataObject.payload["ts"])
dt = datetime.timedelta(microseconds=dataObject.payload["dt"])
values = dataObject.payload["v"]
stringBuffer = ''
cnt = 0
for rv in values:
unsignedValue = rv & 0x00ffffff
valid = (rv & 0x04000000) >> 26
sign = (rv & 0x02000000) >> 25
signedValue = unsignedValue if not sign else unsignedValue * -1
stringBuffer += f"{ts}\t{experiment}\t{rv}\t{valid}\t{signedValue}\n"
ts += dt
cnt += 1
logger.info(f"{cnt} items")
stringIO = StringIO(stringBuffer)
logger.info("database copy start")
with self.connection:
with self.connection.cursor() as cursor:
cursor.copy_from(stringIO, 'egm1md', columns=('time', 'experiment', 'rawvalue', 'valid', 'value'))
logger.info("database copy done")
except iso8601.ParseError as e:
logger.error(f"unable to parse timestamp, items dropped: {e}")
except KeyError as e:
logger.error(f"missing attribute in payload, items dropped: {e}")
except psycopg2.Error as e:
logger.error(f"error {e.pgcode} when talking to database, items dropped: {e}")
logger.info("database write operation done")

View File

@ -1,20 +1,39 @@
from loguru import logger from loguru import logger
from AbstractSinkHandler import AbstractSinkHandler from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
from EGM1mdTsDbMixin import EGM1mdTsDbMixin import iso8601
import datetime
class EGM1mdTsDbSinkHandler(AbstractSinkHandler, EGM1mdTsDbMixin):
class EGM1mdTsDbSinkHandler(AbstractTsDbSinkHandler):
def __init__(self, config, name, inQueue, experiment): def __init__(self, config, name, inQueue, experiment):
super().__init__(config, name, inQueue, experiment) super().__init__(config, name, inQueue, experiment)
logger.info("constructor called") logger.info("constructor called")
def init(self):
logger.info("init")
self.connectDb()
def deinit(self):
logger.info("deinit")
self.disconnectDb()
def sinkAction(self, dataObject): def sinkAction(self, dataObject):
# logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}") logger.info("database write operation start")
self.write(self.experiment, dataObject) try:
# payload format: check dataformat.txt
ts = iso8601.parse_date(dataObject.payload["ts"])
dt = datetime.timedelta(microseconds=dataObject.payload["dt"])
values = dataObject.payload["v"]
stringBuffer = ''
cnt = 0
for rv in values:
unsignedValue = rv & 0x00ffffff
valid = (rv & 0x04000000) >> 26
sign = (rv & 0x02000000) >> 25
signedValue = unsignedValue if not sign else unsignedValue * -1
stringBuffer += f"{ts}\t{self.experiment}\t{self.name}\t{rv}\t{valid}\t{signedValue}\n"
ts += dt
cnt += 1
logger.info(f"{cnt} items")
self.copy('egm1md', ('time', 'experiment', 'device', 'rawvalue', 'valid', 'value'), stringBuffer)
logger.info("database write operation done")
except iso8601.ParseError as e:
logger.error(f"unable to parse timestamp, items dropped: {e}")
except KeyError as e:
logger.error(f"missing attribute in payload, items dropped: {e}")
except TsDbSinkException as e:
logger.error(f"error {e.code} when talking to database, items dropped: {e}")

View File

@ -0,0 +1,33 @@
from loguru import logger
from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
import iso8601
class FlowRigCmdTsDbSinkHandler(AbstractTsDbSinkHandler):
def __init__(self, config, name, inQueue):
super().__init__(config, name, inQueue, 'dummyexperiment')
logger.info("constructor called")
def sinkAction(self, dataObject):
logger.info("database write operation start")
try:
# payload format: check dataformat.txt
# "ts": "yyyy-MM-dd'T'HH-mm-ss.SSSZ",
# "en": "Name of Experiment",
# "ec": "Comment discribing the experiment",
# "es": "start/stop"
ts = iso8601.parse_date(dataObject.payload['ts'])
en = dataObject.payload['en']
ec = dataObject.payload['ec']
es = dataObject.payload['es']
self.insert('flowrigcmd', ('time', 'experiment', 'description', 'command'), (ts, en, ec, es))
logger.info("database write operation done")
except iso8601.ParseError as e:
logger.error(f"unable to parse timestamp, items dropped: {e}")
except KeyError as e:
logger.error(f"missing attribute in payload, items dropped: {e}")
except TsDbSinkException as e:
logger.error(f"error {e.code} when talking to database, items dropped: {e.msg}")

View File

@ -0,0 +1,33 @@
from loguru import logger
from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
import iso8601
class FlowRigLogTsDbSinkHandler(AbstractTsDbSinkHandler):
def __init__(self, config, name, inQueue, experiment):
super().__init__(config, name, inQueue, experiment)
logger.info("constructor called")
def sinkAction(self, dataObject):
logger.info("database write operation start")
try:
# payload format: check dataformat.txt
ts = iso8601.parse_date(dataObject.payload["ts"])
values = dataObject.payload["v"]
stringBuffer = ''
cnt = 0
for valueItems in values:
stringBuffer += f"{ts}\t{self.experiment}\t{self.name}\t{valueItems['n']}\t{valueItems['v']}\n"
cnt += 1
logger.info(f"{cnt} items")
self.copy('flowriglog', ('time', 'experiment', 'device', 'name', 'value'), stringBuffer)
logger.info("database write operation done")
except iso8601.ParseError as e:
logger.error(f"unable to parse timestamp, items dropped: {e}")
except KeyError as e:
logger.error(f"missing attribute in payload, items dropped: {e}")
except TsDbSinkException as e:
logger.error(f"error {e.code} when talking to database, items dropped: {e.msg}")

View File

@ -3,11 +3,10 @@ import psycopg2
import psycopg2.extras import psycopg2.extras
import os import os
class AbstractTsDbMixin(object): class PgDbHandle(object):
def __init__(self, config, name, inQueue, experiment): def __init__(self, config):
logger.info("constructor called") logger.info("constructor called")
self.config = config self.config = config
self.name = name
self.connection = None self.connection = None
def connectDb(self): def connectDb(self):
@ -18,7 +17,7 @@ class AbstractTsDbMixin(object):
dbPassword = os.environ["DB_PASSWORD"] dbPassword = os.environ["DB_PASSWORD"]
dbName = self.config["database"]["name"] dbName = self.config["database"]["name"]
logger.info(f"Connect to database for {self.name}") logger.info(f"Connect to database")
connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require" connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require"
if dbPassword != "$NONE$": if dbPassword != "$NONE$":
connInfo += f" password={dbPassword}" connInfo += f" password={dbPassword}"
@ -27,11 +26,9 @@ class AbstractTsDbMixin(object):
logger.info("Connection to database established") logger.info("Connection to database established")
def disconnectDb(self): def disconnectDb(self):
logger.info(f"Disonnect from database for {self.name}") logger.info(f"Disonnect from database")
if self.connection: if self.connection:
self.connection.close() self.connection.close()
def write(self, experiment, dataObject):
raise NotImplementedError()

View File

@ -1,12 +1,14 @@
from math import exp from math import exp
from queue import Empty from queue import Empty, Queue
import threading import threading
from loguru import logger from loguru import logger
from time import sleep from time import sleep
from DataObject import DataObject from DataObject import DataObject
from enum import Enum from enum import Enum
from FlowRigCmdTsDbSinkHandler import FlowRigCmdTsDbSinkHandler
from SlaveHandler import SlaveHandler from SlaveHandler import SlaveHandler
from FlowRigLogTsDbSinkHandler import FlowRigLogTsDbSinkHandler
POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None) POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
@ -30,11 +32,24 @@ class RigCmdHandler(threading.Thread):
self.slaves = [] self.slaves = []
self.inQueue = inQueue self.inQueue = inQueue
def init(self):
self.outQueue = Queue()
self.cmdDbSink = FlowRigCmdTsDbSinkHandler(self.config, 'master', self.outQueue)
self.cmdDbSink.start()
logger.debug("FlowRigCmdTsDbSinkHandler started")
def deinit(self):
self.cmdDbSink.stop()
logger.debug("FlowRigCmdTsDbSinkHandler stopped")
self.cmdDbSink.join()
logger.debug("FlowRigCmdTsDbSinkHandler joined")
def run(self): def run(self):
logger.debug("RigCmdHandler loop started") logger.debug("RigCmdHandler loop started")
self.init()
state = e_State.IDLE state = e_State.IDLE
waitCnt = 0 waitCnt = 0
experiment = None
while True: while True:
try: try:
@ -52,10 +67,9 @@ class RigCmdHandler(threading.Thread):
if (dataObject == TIMEOUT_PILL): if (dataObject == TIMEOUT_PILL):
continue continue
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'start'): if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'start'):
experiment = dataObject.payload["en"] logger.info(f"start command received, switch to RUNNING state")
logger.info(f"start command received, switch to RUNNING state, experiment is {experiment}")
state = e_State.RUNNING state = e_State.RUNNING
self.experimentStart(experiment) self.experimentStart(dataObject)
elif (dataObject.name == 'cmd'): elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in IDLE state") logger.error(f"illegal command {dataObject.name} received in IDLE state")
else: else:
@ -66,6 +80,7 @@ class RigCmdHandler(threading.Thread):
if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'stop'): if (dataObject.name == 'cmd') and (dataObject.payload["es"] == 'stop'):
logger.info("stop command received, switch to WAITING state") logger.info("stop command received, switch to WAITING state")
state = e_State.WAITING state = e_State.WAITING
self.experimentStop(dataObject)
waitCnt = 0 waitCnt = 0
elif (dataObject.name == 'cmd'): elif (dataObject.name == 'cmd'):
logger.error(f"illegal command {dataObject.name} received in RUNNING state") logger.error(f"illegal command {dataObject.name} received in RUNNING state")
@ -77,20 +92,25 @@ class RigCmdHandler(threading.Thread):
if (waitCnt >= GRACE_PERIOD): if (waitCnt >= GRACE_PERIOD):
logger.info("grace period is over, switch to IDLE state") logger.info("grace period is over, switch to IDLE state")
state = e_State.IDLE state = e_State.IDLE
self.experimentStop(experiment) self.experimentTeardown(dataObject)
else: else:
logger.error("illegal message (not TIMEOUT) received in WAITING state") logger.error("illegal message (not TIMEOUT) received in WAITING state")
def experimentStart(self, experiment): def experimentStart(self, dataObject):
logger.info(f"experiment started, {experiment}") logger.info(f"experiment started, {dataObject.payload['en']}")
self.outQueue.put(dataObject)
for slaveName in self.slaveNames: for slaveName in self.slaveNames:
slave = SlaveHandler(self.config, slaveName, experiment) slave = SlaveHandler(self.config, slaveName, dataObject.payload['en'])
slave.start() slave.start()
self.slaves.append(slave) self.slaves.append(slave)
def experimentStop(self, experiment): def experimentStop(self, dataObject):
logger.info(f"experiment stopped, {experiment}") logger.info(f"experiment stopped, {dataObject.payload['en']}")
self.outQueue.put(dataObject)
def experimentTeardown(self, dataObject):
logger.info(f"experiment teared down")
for slave in self.slaves: for slave in self.slaves:
slave.stop() slave.stop()
self.slaves.clear() self.slaves.clear()
@ -98,4 +118,5 @@ class RigCmdHandler(threading.Thread):
def stop(self): def stop(self):
self.inQueue.put(POISON_PILL) self.inQueue.put(POISON_PILL)
self.deinit()
logger.debug("kill flag set") logger.debug("kill flag set")

View File

@ -9,7 +9,7 @@ user=iiot
# set password to $ENV$ to load it from environment # set password to $ENV$ to load it from environment
# set password to $NONE$ to not provide a password # set password to $NONE$ to not provide a password
# but for instance use a client certificate # but for instance use a client certificate
password= password=a5SFhT5VQWsaSgcCLiady8Ca8WNHqNpe
name=iiotfeed name=iiotfeed
[master] [master]
@ -30,5 +30,5 @@ sinkHandler=EGM1mdTsDbSinkHandler
[rig01] [rig01]
topic=rd/set01/rig01/log topic=rd/set01/rig01/log
dataObjectName=log dataObjectName=log
sinkHandler=DummySinkHandler sinkHandler=FlowRigLogTsDbSinkHandler

View File

@ -5,6 +5,35 @@ create table egm1md (
valid boolean not null, valid boolean not null,
value integer not null value integer not null
); );
select create_hypertable('egm1md', 'time'); select create_hypertable('egm1md', 'time');
create index egm1md_experiment on egm1md (experiment);
alter table egm1md add column device varchar(64);
update egm1md set device = '-';
alter table egm1md alter column device set not null;
create index egm1md_device on egm1md (device);
create table flowriglog (
time timestamp without time zone not null,
experiment varchar(64) not null,
name varchar(64) not null,
value double precision not null
);
select create_hypertable('flowriglog', 'time');
create index flowriglog_experiment on flowriglog (experiment);
alter table flowriglog add column device varchar(64);
update flowriglog set device = '-';
alter table flowriglog alter column device set not null;
create index flowriglog_device on flowriglog (device);
create table flowrigcmd (
time timestamp without time zone not null,
experiment varchar(64) not null,
description varchar(256) not null,
command varchar(32) not null
);
select create_hypertable('flowrigcmd', 'time');
create index flowrigcmd_experiment on flowrigcmd (experiment);