diff --git a/schema/conf-prov.sql b/schema/conf-prov.sql new file mode 100644 index 0000000..1e6e813 --- /dev/null +++ b/schema/conf-prov.sql @@ -0,0 +1,212 @@ +-- Configuration and Provisioning Schema + + +DROP TABLE tReadDatapoint; + +CREATE TABLE tReadDatapoint ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + unit INTEGER NOT NULL, + address INTEGER NOT NULL, + count INTEGER NOT NULL, + converter VARCHAR(10) NOT NULL, + label VARCHAR(128) NOT NULL, + scanRate TIME(3) DEFAULT '00:00:01.000', + topic VARCHAR(256) NOT NULL, + lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000', + lastError VARCHAR(512), + lastValue VARCHAR(512), + backoff TIME(3) DEFAULT '00:00:00.000', + available BOOLEAN DEFAULT TRUE, + retries INTEGER NOT NULL DEFAULT 0, + giveUpCount INTEGER NOT NULL DEFAULT 0, + active BOOLEAN NOT NULL DEFAULT TRUE, + CONSTRAINT uniqueReadDatapoint UNIQUE (unit, address, count, label) +); + +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(4, 0x2000, 2, 'F', '(ERR) Unavailable device', 'IoT/ModbusMaster1/UnavailableDevice', '00:00:01.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2000, 4, 'F', '(ERR) Wrong register size', 'IoT/ModbusMaster1/WrongRegisterSize', '00:05:00.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2000, 2, 'F', 'Voltage', 'IoT/ModbusMaster1/Voltage', '00:05:00.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2020, 2, 'F', 'Frequency', 'IoT/ModbusMaster1/Frequency', '00:05:00.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2060, 2, 'F', 'Current', 'IoT/ModbusMaster1/Current', '00:05:00.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x0004, 2, 'RF', 'Resistance Channel 1', 'IoT/ModbusMaster1/Channel1/Resistance', '00:00:01.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x000C, 2, 'RF', 'Temperature Channel 1', 'IoT/ModbusMaster1/Channel1/Temperature', '00:00:01.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x0014, 2, 'RF', 'Resistance Channel 2', 'IoT/ModbusMaster1/Channel2/Resistance', '00:00:01.000'); +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x001C, 2, 'RF', 'Temperature Channel 2', 'IoT/ModbusMaster1/Channel2/Temperature', '00:00:01.000'); + + +DROP TABLE tWriteDatapoint; + +CREATE TABLE tWriteDatapoint ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + unit INTEGER NOT NULL, + address INTEGER NOT NULL, + count INTEGER NOT NULL, + converter VARCHAR(10) NOT NULL, + label VARCHAR(128) NOT NULL, + topic VARCHAR(256) NOT NULL, + lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000', + lastError VARCHAR(512), + value VARCHAR(512), + retries INTEGER NOT NULL DEFAULT 0, + active BOOLEAN NOT NULL DEFAULT TRUE, + CONSTRAINT uniqueWriteDatapoint UNIQUE (unit, address, count, label) +); + + + +INSERT INTO tWriteDatapoint (unit, address, count, converter, label, topic, active) + VALUES(5, 0x0000, 1, 'B', 'Relay 1', 'IoT/ModbusMaster1/Relay1', FALSE); + + +CREATE OR REPLACE VIEW vReadDatapointsToBeHandled AS + SELECT id, unit, address, count, converter + FROM tReadDatapoint + WHERE available AND + active AND + ADDTIME(lastContact, ADDTIME(scanRate, backoff)) < NOW(3) + ORDER BY scanRate; + +CREATE OR REPLACE VIEW vWriteDatapintsToBeHandled AS + SELECT id, unit, address, count, converter, value + FROM tWriteDatapoint + WHERE active; + + + +DROP TABLE tReadNotification; + +CREATE TABLE tReadNotification ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + readDatapointId INTEGER NOT NULL REFERENCES tReadDatapoint(id), + notificationType VARCHAR(1), + CONSTRAINT checkNotificationType CHECK (notificationtype IN ('V', 'F', 'R')) -- value, failure, return +); + + +DROP TABLE tWrittenNotification; + +CREATE TABLE tWrittenNotification ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + writeDatapointId INTEGER NOT NULL REFERENCES tWriteDatapoint(id), + notificationType VARCHAR(1), + CONSTRAINT checkNotificationType CHECK (notificationtype IN ('S', 'F')) -- success, failure +); + + + + +DELIMITER $$ +CREATE OR REPLACE PROCEDURE prWriteFeedback (IN p_id INTEGER, IN p_lastError VARCHAR(512)) + MODIFIES SQL DATA + BEGIN + DECLARE v_retries INTEGER; + DECLARE v_active BOOLEAN; + + IF p_lastError = '' OR p_lastError IS NULL THEN + UPDATE tWriteDatapoint + SET lastError = NULL, + lastContact = NOW(3), + retries = 0, + active = FALSE + WHERE id = p_id; + INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES (p_id, 'S'); + ELSE + SELECT retries + INTO v_retries + FROM tWriteDatapoint + WHERE id = p_id; + + SET v_retries := v_retries + 1; + + IF v_retries >= 5 THEN + SET v_retries := 0; + SET v_active := FALSE; + ELSE + SET v_active := TRUE; + END IF; + + UPDATE tWriteDatapoint + SET lastError = p_lastError, + retries = v_retries, + active = v_active + WHERE id = p_id; + + IF NOT v_active THEN + INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES(p_id, 'F'); + END IF; + END IF; + END; $$ +DELIMITER ; + + + + +DELIMITER $$ +CREATE OR REPLACE PROCEDURE prReadFeedback (IN p_id INTEGER, IN p_lastValue VARCHAR(512), IN p_lastError VARCHAR(512)) + MODIFIES SQL DATA + BEGIN + DECLARE v_retries INTEGER; + DECLARE v_backoff TIME(3); + DECLARE v_scanRate TIME(3); + DECLARE v_giveUpCount INTEGER; + DECLARE v_available BOOLEAN; + + IF p_lastError = '' OR p_lastError IS NULL THEN + UPDATE tReadDatapoint + SET lastError = NULL, + lastContact = NOW(3), + lastValue = p_lastValue, + retries = 0, + backoff = '00:00:00.000', + giveUpCount = 0 + WHERE id = p_id; + INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'V'); + ELSE + SELECT retries, backoff, scanRate, giveUpCount + INTO v_retries, v_backoff, v_scanRate, v_giveUpCount + FROM tReadDatapoint + WHERE id = p_id; + + SET v_retries := v_retries + 1; + + IF v_retries >= 5 THEN + IF v_backoff = '00:00:00.000' THEN + SET v_backoff = v_scanRate; + ELSE + SET v_backoff = ADDTIME(v_backoff, v_backoff); + END IF; + SET v_retries := 0; + SET v_giveUpCount := v_giveUpCount + 1; + SET v_available := TRUE; + END IF; + IF v_giveUpCount = 10 THEN + SET v_available := FALSE; + SET v_giveUpCount := 0; + SET v_backoff := '00:00:00.000'; + END IF; + + UPDATE tReadDatapoint + SET lastError = p_lastError, + retries = v_retries, + backoff = v_backoff, + giveUpCount = v_giveUpCount, + available = v_available + WHERE id = p_id; + + IF NOT v_available THEN + INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'F'); + END IF; + END IF; + END; $$ +DELIMITER ; + + diff --git a/snippets/test8.py b/snippets/test8.py new file mode 100644 index 0000000..af4b027 --- /dev/null +++ b/snippets/test8.py @@ -0,0 +1,427 @@ +import queue +import datetime +import threading +import socketserver +import cmd +import io +import paho.mqtt.client as mqtt +import re + + +class Config(object): + def __init__(self): + self.mqttBrokerHost = '127.0.0.1' + self.mqttBrokerPort = 1883 + self.mqttLogin = '' + self.mqttPassword = '' + + + +class AbstractNotificationReceiver(object): + def receiveNotification(self, arg): + raise NotImplementedError + +class NotificationForwarder(object): + def __init__(self): + self.receivers = [] + + def register(self, receiver): + self.receivers.append(receiver) + + def notify(self, arg=None): + for r in self.receivers: + r.receiveNotification(arg) + + +class AbstractModbusDatapoint(object): + def __init__(self, label, unit, address, count, scanRate): + self.label = label + self.unit = unit + self.address = address + self.count = count + self.scanRate = scanRate + self.type = 'abstract data point' + self.command = None + self.value = None + self.enqueued = False + if self.scanRate: + self.priority = 1 + else: + self.priority = 0 + + def __str__(self): + return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) + + def setCommand(self, cmd): + self.command = cmd + + def setValue(self, value): + self.value = value + + +class HoldingRegisterDatapoint(AbstractModbusDatapoint): + def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): + super().__init__(label, unit, address, count, scanRate) + self.publishTopic = publishTopic + self.subscribeTopic = subscribeTopic + self.feedbackTopic = feedbackTopic + self.writeRequestValue = None + self.lastContact = None + self.type = 'read holding register' + + def __str__(self): + return "[{0!s}, {1} {2} {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic) + + def process(self): + successFull = False + giveUp = False + if self.writeRequestValue: + # perform write operation + if successFull: + # give feedback + self.writeRequestValue = None + else: + # retries handling + if giveUp: + # give negative feedback + self.writeRequestValue = None + else: + # perform read operation + if successFull: + self.lastContact = datetime.datetime.now() + # publish value + else: + # retries handling + if giveUp: + # backoff and availability handling + # give negative feedback + pass + + def onMessage(self, value): + self.writeRequestValue = value + + +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + +class MqttProcessor(threading.Thread, AbstractNotificationReceiver): + def __init__(self, registers, queue): + super().__init__() + self.registers = registers + self.queue = queue + self.client = mqtt.Client(userdata=self) + self.subscriptions = [] + self.topicRegisterMap ={} + + def __processUpdatedRegisters(self, force=False): + # print("MqttProcessor.__updateSubscriptions") + + subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] + # print("Topics: {0!s}".format(subscribeTopics)) + + for subscribeTopic in subscribeTopics: + if (subscribeTopic not in self.subscriptions) or force: + print("Subscribe to {0}".format(subscribeTopic)) + self.client.subscribe(subscribeTopic) + self.subscriptions.append(subscribeTopic) + + for subscription in self.subscriptions: + if (subscription not in subscribeTopics) and not force: + print("Unsubscribe from {0}".format(subscription)) + self.client.unsubscribe(subscription) + self.subscriptions.remove(subscription) + + self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } + + def receiveNotification(self, arg): + print("MqttProcessor:registersChanged") + self.__processUpdatedRegisters() + + def run(self): + # print("MqttProcessor.run") + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + if config.mqttLogin and config.mqttPassword: + self.client.username_pw_set(config.mqttLogin, config.mqttPassword) + self.client.connect(config.mqttBrokerHost, config.mqttBrokerPort) + self.client.loop_forever() + + def onConnect(self): + # print("MqttProcessor.onConnect") + self.__processUpdatedRegisters(force=True) + + def onDisconnect(self, rc): + print("Disconnected from MQTT broker: {0}".format(rc)) + + def onMessage(self, topic, payload): + # print("MqttProcessor.onMessage") + r = self.topicRegisterMap[topic] + # print("{0}: {1!s} -> {2!s}".format(topic, payload, r)) + r.setCommand('w') + r.setValue(payload) + self.queue.put(r) + + +class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): + def __init__(self, registers, queue): + super().__init__() + self.registers = registers + self.queue = queue + self.delayEvent = threading.Event() + + def getMinimalScanrate(self): + return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) + + def receiveNotification(self, arg): + print("ScanRateConsideringQueueFeeder:registersChanged") + self.delay = self.getMinimalScanrate() + + def run(self): + self.delay = self.getMinimalScanrate() + while True: + registersToBeHandled = [ + r for r in self.registers if ((not r.enqueued) and + (r.scanRate) and + ((not r.lastContact) or + (r.lastContact + r.scanRate < datetime.datetime.now()))) + ] + registersToBeHandled.sort(key=lambda x : x.scanRate) + for r in registersToBeHandled: + r.setCommand('r') + self.queue.put(r) + r.enqueued = True + self.delayEvent.wait(self.delay) + + +class CommunicationProcessor(threading.Thread): + def __init__(self, queue): + super().__init__() + self.queue = queue + + def run(self): + while True: + r = self.queue.get() + # r.process() + r.lastContact = datetime.datetime.now() + print("Dequeued: {0!s}".format(r)) + r.enqueued = False + + +class MyPriorityQueueItem(object): + def __init__(self, itemWithPriority): + self.itemWithPriority = itemWithPriority + + def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority + def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority + def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority + def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority + def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority + def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority + +class MyPriorityQueue(queue.PriorityQueue): + def _put(self, itemWithPriority): + i = MyPriorityQueueItem(itemWithPriority) + super()._put(i) + + def _get(self): + i = super()._get() + return i.itemWithPriority + +class CmdInterpreterException(ValueError): pass + +def parseIntArbitraryBase(s): + i = 0 + if s.startswith('0x'): + i = int(s, 16) + elif s.startswith('0b'): + i = int(s, 2) + else: + i = int(s, 10) + return i + +class CmdInterpreter(cmd.Cmd): + def __init__(self, infile, outfile, notifier, registers): + super().__init__(stdin=infile, stdout=outfile) + self.use_rawinput = False + self.notifier = notifier + self.registers = registers + self.prompt = "test8> " + self.intro = "test8 admin interface" + self.splitterRe = re.compile('\s+') + + def __print(self, text): + self.stdout.write(text) + + def __println(self, text): + self.stdout.write(text) + self.stdout.write("\n\r") + + def do_notify(self, arg): + self.notifier.notify() + + def help_notify(self): + self.__println("Notifies threads using the list of datapoints about changes in this list.") + self.__println("Call after modifications on the list.") + + def do_quit(self, arg): + self.__println("Bye!") + return True + + def do_add(self, arg): + try: + (registerType, label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic) = self.splitterRe.split(arg) + self.__println("RegisterType: {0}".format(registerType)) + self.__println("Label: {0}".format(label)) + self.__println("Unit: {0}".format(unit)) + self.__println("Address: {0}".format(address)) + self.__println("Count: {0}".format(count)) + self.__println("ScanRate: {0}".format(scanrate)) + self.__println("ReadTopic: {0}".format(readTopic)) + self.__println("WriteTopic: {0}".format(writeTopic)) + self.__println("FeedbackTopic: {0}".format(feedbackTopic)) + + if readTopic == 'None': + readTopic = None + if writeTopic == 'None': + writeTopic = None + if feedbackTopic == 'None': + feedbackTopic = None + unit = parseIntArbitraryBase(unit) + address = parseIntArbitraryBase(address) + count = parseIntArbitraryBase(count) + scanrate = float(scanrate) + if scanrate == 0: + if readTopic: + raise CmdInterpreterException('readTopic must not be set when scanRate is zero') + if not writeTopic: + raise CmdInterpreterException('writeTopic must be set when scanRate is zero') + if not feedbackTopic: + raise CmdInterpreterException('feedbackTopic must be set when scanRate is zero') + else: + if not readTopic: + raise CmdInterpreterException('readTopic must be set when scanRate is zero') + if writeTopic: + raise CmdInterpreterException('writeTopic must not be set when scanRate is zero') + if feedbackTopic: + raise CmdInterpreterException('feedbackTopic must not be set when scanRate is zero') + allowedRegisterTypes = ['HoldingRegister'] + if registerType not in allowedRegisterTypes: + raise CmdInterpreterException('Unknown register type {0}, allowed are {1!s}'.format(registerType, allowedRegisterTypes)) + + + except ValueError as e: + self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) + + def help_add(self): + # HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), + self.__println("Usage: add