first success with database
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +1,2 @@
|
||||
__pycache__/
|
||||
config/
|
||||
|
@ -9,6 +9,7 @@ POISON_PILL = DataObject(name="PoisonPill", topic="kill", payload=None)
|
||||
class AbstractSinkHandler(threading.Thread):
|
||||
def __init__(self, config, name, inQueue, experiment):
|
||||
super().__init__()
|
||||
logger.info("constructor called")
|
||||
|
||||
self.config = config
|
||||
self.localConfig = self.config[name]
|
||||
@ -16,6 +17,7 @@ class AbstractSinkHandler(threading.Thread):
|
||||
self.inQueue = inQueue
|
||||
self.experiment = experiment
|
||||
|
||||
|
||||
def run(self):
|
||||
logger.debug("loop started")
|
||||
|
||||
@ -27,10 +29,15 @@ class AbstractSinkHandler(threading.Thread):
|
||||
logger.debug("swallowed the poison pill")
|
||||
break
|
||||
self.sinkAction(dataObject)
|
||||
|
||||
self.deinit()
|
||||
|
||||
def init(self):
|
||||
logger.debug("no initialization needed")
|
||||
|
||||
def deinit(self):
|
||||
logger.debug("no deinitialization needed")
|
||||
|
||||
def sinkAction(self, dataObject):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
37
src/AbstractTsDbMixin.py
Normal file
37
src/AbstractTsDbMixin.py
Normal file
@ -0,0 +1,37 @@
|
||||
from loguru import logger
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import os
|
||||
|
||||
class AbstractTsDbMixin(object):
|
||||
def __init__(self, config, name, inQueue, experiment):
|
||||
logger.info("constructor called")
|
||||
self.config = config
|
||||
self.name = name
|
||||
self.connection = None
|
||||
|
||||
def connectDb(self):
|
||||
dbHost = self.config["database"]["host"]
|
||||
dbUser = self.config["database"]["user"]
|
||||
dbPassword = self.config["database"]["password"]
|
||||
if dbPassword == "$ENV$":
|
||||
dbPassword = os.environ["DB_PASSWORD"]
|
||||
dbName = self.config["database"]["name"]
|
||||
|
||||
logger.info(f"Connect to database for {self.name}")
|
||||
connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require"
|
||||
if dbPassword != "$NONE$":
|
||||
connInfo += f" password={dbPassword}"
|
||||
self.connection = psycopg2.connect(connInfo)
|
||||
self.connection.autocommit = False
|
||||
logger.info("Connection to database established")
|
||||
|
||||
def disconnectDb(self):
|
||||
logger.info(f"Disonnect from database for {self.name}")
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
|
||||
def write(self, experiment, dataObject):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
43
src/EGM1mdTsDbMixin.py
Normal file
43
src/EGM1mdTsDbMixin.py
Normal file
@ -0,0 +1,43 @@
|
||||
from AbstractTsDbMixin import AbstractTsDbMixin
|
||||
from loguru import logger
|
||||
import iso8601
|
||||
import datetime
|
||||
from io import StringIO
|
||||
|
||||
class EGM1mdTsDbMixin(AbstractTsDbMixin):
|
||||
def write(self, experiment, dataObject):
|
||||
logger.info(f"Db write: {experiment} {dataObject}")
|
||||
try:
|
||||
ts = iso8601.parse_date(dataObject.payload["ts"])
|
||||
dt = datetime.timedelta(microseconds=dataObject.payload["dt"])
|
||||
values = dataObject.payload["v"]
|
||||
logger.info(f"ts: {ts}")
|
||||
logger.info(f"dt: {dt}")
|
||||
logger.info(f"v: {values}")
|
||||
|
||||
stringBuffer = ''
|
||||
for rv in values:
|
||||
statusOctet = (rv & 0xff000000) >> 24
|
||||
unsignedValue = rv & 0x00ffffff
|
||||
valid = (statusOctet & 0x04) >> 2
|
||||
sign = (statusOctet & 0x02) >> 1
|
||||
signedValue = unsignedValue if not sign else unsignedValue * -1
|
||||
|
||||
logger.info(f"store: ts: {ts}, rv: {rv:08x}, status: {statusOctet:02x}, valid: {valid}, sign: {sign}, unsigned: {unsignedValue}, signed: {signedValue}")
|
||||
|
||||
stringBuffer += f"{ts}\t{experiment}\t{rv}\t{valid}\t{signedValue}\n"
|
||||
ts += dt
|
||||
|
||||
stringIO = StringIO(stringBuffer)
|
||||
|
||||
logger.info("database copy started")
|
||||
with self.connection:
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.copy_from(stringIO, 'egm1md', columns=('time', 'experiment', 'rawvalue', 'valid', 'value'))
|
||||
logger.info("database copy done")
|
||||
|
||||
|
||||
except iso8601.ParseError as e:
|
||||
logger.error(f"unable to parse timestamp, item dropped: {f}")
|
||||
except KeyError as e:
|
||||
logger.error(f"missing attribute in payload, item dropped: {e}")
|
20
src/EGM1mdTsDbSinkHandler.py
Normal file
20
src/EGM1mdTsDbSinkHandler.py
Normal file
@ -0,0 +1,20 @@
|
||||
from loguru import logger
|
||||
from AbstractSinkHandler import AbstractSinkHandler
|
||||
from EGM1mdTsDbMixin import EGM1mdTsDbMixin
|
||||
|
||||
class EGM1mdTsDbSinkHandler(AbstractSinkHandler, EGM1mdTsDbMixin):
|
||||
def __init__(self, config, name, inQueue, experiment):
|
||||
super().__init__(config, name, inQueue, experiment)
|
||||
logger.info("constructor called")
|
||||
|
||||
def init(self):
|
||||
logger.info("init")
|
||||
self.connectDb()
|
||||
|
||||
def deinit(self):
|
||||
logger.info("deinit")
|
||||
self.disconnectDb()
|
||||
|
||||
def sinkAction(self, dataObject):
|
||||
logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}")
|
||||
self.write(self.experiment, dataObject)
|
@ -3,6 +3,15 @@ login=
|
||||
password=
|
||||
broker=172.16.2.16
|
||||
|
||||
[database]
|
||||
host=172.16.10.27
|
||||
user=iiot
|
||||
# set password to $ENV$ to load it from environment
|
||||
# set password to $NONE$ to not provide a password
|
||||
# but for instance use a client certificate
|
||||
password=8NP2M3CHm7EQhYiY6aGggTWsBDBECAkr
|
||||
name=iiotfeed
|
||||
|
||||
[master]
|
||||
topic=rd/set01/rig01/cmd
|
||||
dataObjectName=cmd
|
||||
@ -11,12 +20,12 @@ slaves=dev05 dev06 rig01
|
||||
[dev05]
|
||||
topic=rd/set01/rig01/dev05/md
|
||||
dataObjectName=value
|
||||
sinkHandler=DummySinkHandler
|
||||
sinkHandler=EGM1mdTsDbSinkHandler
|
||||
|
||||
[dev06]
|
||||
topic=rd/set01/rig01/dev06/md
|
||||
dataObjectName=value
|
||||
sinkHandler=DummySinkHandler
|
||||
sinkHandler=EGM1mdTsDbSinkHandler
|
||||
|
||||
[rig01]
|
||||
topic=rd/set01/rig01/log
|
||||
|
@ -1,3 +1,7 @@
|
||||
// timestamps must be given in RFC3339 format, timezone UTC
|
||||
// timedeltas must be given in integer microseconds
|
||||
|
||||
|
||||
// MQTT command data package of the rig-PC.
|
||||
// Topic: rd/set01/rig01/cmd
|
||||
{
|
||||
@ -79,7 +83,7 @@
|
||||
{
|
||||
"ts": "time of first sample",
|
||||
"dt": "sampling time"
|
||||
"values": [
|
||||
"v": [
|
||||
0x00000000, // 1st value (0x00FFFFFF -> 24 bit raw data mask, 0x02000000 -> mag. field polarity mask (1 = +1, 0 = -1) , 0x04000000 -> data validation mask (1 = valid, 0 = invalid))
|
||||
0x00000000, // 2nd value
|
||||
"..."
|
||||
@ -93,7 +97,7 @@
|
||||
{
|
||||
"ts": "time of first sample",
|
||||
"dt": "sampling time"
|
||||
"values": [
|
||||
"v": [
|
||||
0x00000000, // 1st value (0x00FFFFFF -> 24 bit raw data mask, 0x02000000 -> mag. field polarity mask (1 = +1, 0 = -1) , 0x04000000 -> data validation mask (1 = valid, 0 = invalid))
|
||||
0x00000000, // 2nd value
|
||||
"..."
|
||||
@ -106,7 +110,7 @@
|
||||
// Topic: rd/set02/dev05/kir/md
|
||||
{
|
||||
"ts": "time of first sample",
|
||||
"values": [
|
||||
"v": [
|
||||
[ 0x00000000, 0x00000000 ], // real, imagine
|
||||
[ 0x00000000, 0x00000000 ],
|
||||
"..."
|
||||
|
10
src/schema.sql
Normal file
10
src/schema.sql
Normal file
@ -0,0 +1,10 @@
|
||||
create table egm1md (
|
||||
time timestamp without time zone not null,
|
||||
experiment varchar(64) not null,
|
||||
rawvalue bigint not null,
|
||||
valid boolean not null,
|
||||
value integer not null
|
||||
);
|
||||
|
||||
select create_hypertable('egm1md', 'time');
|
||||
|
Reference in New Issue
Block a user