inserts into database works
This commit is contained in:
@ -37,7 +37,7 @@ def sendValue(client):
|
|||||||
msg = {
|
msg = {
|
||||||
"ts": str(datetime.now()),
|
"ts": str(datetime.now()),
|
||||||
"dt": 100,
|
"dt": 100,
|
||||||
"v": sample(range(0xffffffff), 3)
|
"v": sample(range(0xffffffff), 3000)
|
||||||
}
|
}
|
||||||
topic = f"{TOPIC_PRE}/rig01/dev05/md"
|
topic = f"{TOPIC_PRE}/rig01/dev05/md"
|
||||||
client.publish(topic, json.dumps(msg))
|
client.publish(topic, json.dumps(msg))
|
||||||
@ -48,14 +48,9 @@ def perform(client, params):
|
|||||||
logger.info("Start")
|
logger.info("Start")
|
||||||
sendCmd(client, "start")
|
sendCmd(client, "start")
|
||||||
|
|
||||||
for i in range(2):
|
for i in range(10):
|
||||||
sendValue(client)
|
sendValue(client)
|
||||||
sleep(1.0)
|
sleep(1.0)
|
||||||
sendValue(client)
|
|
||||||
sleep(1.0)
|
|
||||||
sendValue(client)
|
|
||||||
sendLog(client)
|
|
||||||
sleep(1.0)
|
|
||||||
|
|
||||||
logger.info("Stop")
|
logger.info("Stop")
|
||||||
sendCmd(client, "stop")
|
sendCmd(client, "stop")
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import psycopg2
|
||||||
from AbstractTsDbMixin import AbstractTsDbMixin
|
from AbstractTsDbMixin import AbstractTsDbMixin
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import iso8601
|
import iso8601
|
||||||
@ -6,31 +7,28 @@ from io import StringIO
|
|||||||
|
|
||||||
class EGM1mdTsDbMixin(AbstractTsDbMixin):
|
class EGM1mdTsDbMixin(AbstractTsDbMixin):
|
||||||
def write(self, experiment, dataObject):
|
def write(self, experiment, dataObject):
|
||||||
logger.info(f"Db write: {experiment} {dataObject}")
|
logger.info("database write operation start")
|
||||||
try:
|
try:
|
||||||
ts = iso8601.parse_date(dataObject.payload["ts"])
|
ts = iso8601.parse_date(dataObject.payload["ts"])
|
||||||
dt = datetime.timedelta(microseconds=dataObject.payload["dt"])
|
dt = datetime.timedelta(microseconds=dataObject.payload["dt"])
|
||||||
values = dataObject.payload["v"]
|
values = dataObject.payload["v"]
|
||||||
logger.info(f"ts: {ts}")
|
|
||||||
logger.info(f"dt: {dt}")
|
|
||||||
logger.info(f"v: {values}")
|
|
||||||
|
|
||||||
stringBuffer = ''
|
stringBuffer = ''
|
||||||
|
cnt = 0
|
||||||
for rv in values:
|
for rv in values:
|
||||||
statusOctet = (rv & 0xff000000) >> 24
|
|
||||||
unsignedValue = rv & 0x00ffffff
|
unsignedValue = rv & 0x00ffffff
|
||||||
valid = (statusOctet & 0x04) >> 2
|
valid = (rv & 0x04000000) >> 26
|
||||||
sign = (statusOctet & 0x02) >> 1
|
sign = (rv & 0x02000000) >> 25
|
||||||
signedValue = unsignedValue if not sign else unsignedValue * -1
|
signedValue = unsignedValue if not sign else unsignedValue * -1
|
||||||
|
|
||||||
logger.info(f"store: ts: {ts}, rv: {rv:08x}, status: {statusOctet:02x}, valid: {valid}, sign: {sign}, unsigned: {unsignedValue}, signed: {signedValue}")
|
|
||||||
|
|
||||||
stringBuffer += f"{ts}\t{experiment}\t{rv}\t{valid}\t{signedValue}\n"
|
stringBuffer += f"{ts}\t{experiment}\t{rv}\t{valid}\t{signedValue}\n"
|
||||||
ts += dt
|
ts += dt
|
||||||
|
cnt += 1
|
||||||
|
logger.info(f"{cnt} items")
|
||||||
|
|
||||||
stringIO = StringIO(stringBuffer)
|
stringIO = StringIO(stringBuffer)
|
||||||
|
|
||||||
logger.info("database copy started")
|
logger.info("database copy start")
|
||||||
with self.connection:
|
with self.connection:
|
||||||
with self.connection.cursor() as cursor:
|
with self.connection.cursor() as cursor:
|
||||||
cursor.copy_from(stringIO, 'egm1md', columns=('time', 'experiment', 'rawvalue', 'valid', 'value'))
|
cursor.copy_from(stringIO, 'egm1md', columns=('time', 'experiment', 'rawvalue', 'valid', 'value'))
|
||||||
@ -38,6 +36,10 @@ class EGM1mdTsDbMixin(AbstractTsDbMixin):
|
|||||||
|
|
||||||
|
|
||||||
except iso8601.ParseError as e:
|
except iso8601.ParseError as e:
|
||||||
logger.error(f"unable to parse timestamp, item dropped: {f}")
|
logger.error(f"unable to parse timestamp, items dropped: {e}")
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logger.error(f"missing attribute in payload, item dropped: {e}")
|
logger.error(f"missing attribute in payload, items dropped: {e}")
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
logger.error(f"error {e.pgcode} when talking to database, items dropped: {e}")
|
||||||
|
|
||||||
|
logger.info("database write operation done")
|
||||||
|
@ -16,5 +16,5 @@ class EGM1mdTsDbSinkHandler(AbstractSinkHandler, EGM1mdTsDbMixin):
|
|||||||
self.disconnectDb()
|
self.disconnectDb()
|
||||||
|
|
||||||
def sinkAction(self, dataObject):
|
def sinkAction(self, dataObject):
|
||||||
logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}")
|
# logger.info(f"sink {self.name} received {dataObject} for experiment {self.experiment}")
|
||||||
self.write(self.experiment, dataObject)
|
self.write(self.experiment, dataObject)
|
@ -21,7 +21,8 @@ class GenericMqttSubscriber(AbstractMqttHandler):
|
|||||||
|
|
||||||
def onMessage(self, topic, payload):
|
def onMessage(self, topic, payload):
|
||||||
try:
|
try:
|
||||||
logger.info("message received: {} -> {}".format(topic, str(payload)))
|
# logger.info("message received: {} -> {}".format(topic, str(payload)))
|
||||||
|
logger.info(f"message received on topic {topic}")
|
||||||
dataObject = DataObject(name=self.dataObjectName, topic=topic, payload=json.loads(payload))
|
dataObject = DataObject(name=self.dataObjectName, topic=topic, payload=json.loads(payload))
|
||||||
self.queue.put(dataObject)
|
self.queue.put(dataObject)
|
||||||
except JSONDecodeError as e:
|
except JSONDecodeError as e:
|
||||||
|
Reference in New Issue
Block a user