From 186b1c5adbad80f1e1eab29fb8b347fa0e71eadf Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 20 Jun 2019 00:23:53 +0200 Subject: [PATCH 01/13] initial schema --- schema/conf-prov.sql | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 schema/conf-prov.sql diff --git a/schema/conf-prov.sql b/schema/conf-prov.sql new file mode 100644 index 0000000..c37204c --- /dev/null +++ b/schema/conf-prov.sql @@ -0,0 +1,19 @@ +-- Configuration and Provisioning Schema + +DROP TABLE tDatapoint; + +CREATE TABLE tDatapoint ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + unit INTEGER NOT NULL, + address INTEGER NOT NULL, + count INTEGER NOT NULL, + converter TEXT NOT NULL, + label TEXT NOT NULL, + scanRate INTEGER NOT NULL DEFAULT 1000, -- in milliseconds + lastContact TEXT, + lastError TEXT, + lastValue TEXT, + backoff INTEGER NOT NULL DEFAULT 0, -- in seconds + CONSTRAINT uniqueDatapoint UNIQUE (unit, address, count, label) +); + From 146f8df8e803bf7132ae0e007032be86a6dba492 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 20 Jun 2019 22:01:28 +0200 Subject: [PATCH 02/13] schema extended --- schema/conf-prov.sql | 113 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 8 deletions(-) diff --git a/schema/conf-prov.sql b/schema/conf-prov.sql index c37204c..44c3935 100644 --- a/schema/conf-prov.sql +++ b/schema/conf-prov.sql @@ -1,19 +1,116 @@ -- Configuration and Provisioning Schema + DROP TABLE tDatapoint; CREATE TABLE tDatapoint ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY AUTO_INCREMENT, unit INTEGER NOT NULL, address INTEGER NOT NULL, count INTEGER NOT NULL, - converter TEXT NOT NULL, - label TEXT NOT NULL, - scanRate INTEGER NOT NULL DEFAULT 1000, -- in milliseconds - lastContact TEXT, - lastError TEXT, - lastValue TEXT, - backoff INTEGER NOT NULL DEFAULT 0, -- in seconds + converter VARCHAR(10) NOT NULL, + label VARCHAR(128) NOT NULL, + scanRate TIME(3) DEFAULT '00:00:01.000', + topic VARCHAR(256) NOT NULL, + lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000', + lastError VARCHAR(512), + lastValue VARCHAR(512), + backoff TIME(3) DEFAULT '00:00:00.000', + available BOOLEAN DEFAULT TRUE, + retries INTEGER NOT NULL DEFAULT 0, + giveUpCount INTEGER NOT NULL DEFAULT 0, CONSTRAINT uniqueDatapoint UNIQUE (unit, address, count, label) ); +-- ALTER TABLE tDatapoint MODIFY available BOOLEAN DEFAULT TRUE; +-- ALTER TABLE tDatapoint MODIFY lastContact TIMESTAMP(3); +-- ALTER TABLE tDatapoint MODIFY scanRate TIME(3) DEFAULT '00:00:01.000'; +-- ALTER TABLE tDatapoint ADD giveUpCount INTEGER NOT NULL DEFAULT 0; +-- ALTER TABLE tDatapoint MODIFY lastContact TIMESTAMP(3) NOT NULL DEFAULT '1970-01-01 00:00:01.000'; + +-- ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'), +-- ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'), +-- ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'), +-- ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'), +-- ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'), +-- ModbusRequestDefinition(3, 0x0004, 2, 'RF', 'Resistance Channel 1'), +-- ModbusRequestDefinition(3, 0x000C, 2, 'RF', 'Temperature Channel 1'), +-- ModbusRequestDefinition(3, 0x0014, 2, 'RF', 'Resistance Channel 2'), +-- ModbusRequestDefinition(3, 0x001C, 2, 'RF', 'Temperature Channel 2'), + +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(4, 0x2000, 2, 'F', '(ERR) Unavailable device', 'IoT/ModbusMaster1/UnavailableDevice', '00:00:01.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2000, 4, 'F', '(ERR) Wrong register size', 'IoT/ModbusMaster1/WrongRegisterSize', '00:05:00.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2000, 2, 'F', 'Voltage', 'IoT/ModbusMaster1/Voltage', '00:05:00.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2020, 2, 'F', 'Frequency', 'IoT/ModbusMaster1/Frequency', '00:05:00.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(1, 0x2060, 2, 'F', 'Current', 'IoT/ModbusMaster1/Current', '00:05:00.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x0004, 2, 'RF', 'Resistance Channel 1', 'IoT/ModbusMaster1/Channel1/Resistance', '00:00:01.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x000C, 2, 'RF', 'Temperature Channel 1', 'IoT/ModbusMaster1/Channel1/Temperature', '00:00:01.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x0014, 2, 'RF', 'Resistance Channel 2', 'IoT/ModbusMaster1/Channel2/Resistance', '00:00:01.000'); +INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) + VALUES(3, 0x001C, 2, 'RF', 'Temperature Channel 2', 'IoT/ModbusMaster1/Channel2/Temperature', '00:00:01.000'); + + +CREATE OR REPLACE VIEW vDatapointsToBeQueried AS + SELECT id, unit, address, count, converter + FROM tDatapoint + WHERE ADDTIME(lastContact, ADDTIME(scanRate, backoff)) < NOW(3) AND + available; + + +DROP TABLE tNotification; + +CREATE TABLE tNotification ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + datapointId INTEGER NOT NULL REFERENCES tDatapoint(id), + notificationType VARCHAR(1), + CONSTRAINT checkNotificationType CHECK (notificationtype IN ('V', 'F', 'R')) +); + + +DELIMITER $$ +CREATE OR REPLACE TRIGGER trCheckAvailability + BEFORE UPDATE ON tDatapoint FOR EACH ROW + BEGIN + IF NEW.retries >= 5 THEN + IF NEW.backoff = '00:00:00.000' THEN + SET NEW.backoff = OLD.scanRate; + ELSE + SET NEW.backoff = ADDTIME(OLD.backoff, OLD.backoff); + END IF; + SET NEW.retries := 0; + SET NEW.giveUpCount := OLD.giveUpCount + 1; + END IF; + IF NEW.giveUpCount = 10 THEN + SET NEW.available := FALSE; + SET NEW.giveUpCount := 0; + SET NEW.backoff := '00:00:00.000'; + END IF; + END; $$ +DELIMITER ; + +DELIMITER $$ +CREATE OR REPLACE TRIGGER trNotification + AFTER UPDATE ON tDatapoint FOR EACH ROW + BEGIN + DECLARE v_notificationType VARCHAR(1); + IF (NEW.lastError IS NULL OR NEW.lastError = '') AND (NEW.lastValue IS NOT NULL) THEN + SET v_notificationType := 'V'; + ELSEIF NEW.available AND NOT OLD.available THEN + SET v_notificationType := 'R'; + ELSEIF NOT NEW.available AND OLD.available THEN + SET v_notificationType := 'F'; + END IF; + IF v_notificationType IS NOT NULL THEN + INSERT INTO tNotification (datapointId, notificationType) VALUES(NEW.id, v_notificationType); + END IF; + END; $$ +DELIMITER ; + From 04be7219c2838ce6e3be5077f049d99784eae8f9 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 21 Jun 2019 13:20:22 +0200 Subject: [PATCH 03/13] adjust schema --- schema/conf-prov.sql | 226 ++++++++++++++++++++++++++++++------------- 1 file changed, 161 insertions(+), 65 deletions(-) diff --git a/schema/conf-prov.sql b/schema/conf-prov.sql index 44c3935..1e6e813 100644 --- a/schema/conf-prov.sql +++ b/schema/conf-prov.sql @@ -1,9 +1,9 @@ -- Configuration and Provisioning Schema -DROP TABLE tDatapoint; +DROP TABLE tReadDatapoint; -CREATE TABLE tDatapoint ( +CREATE TABLE tReadDatapoint ( id INTEGER PRIMARY KEY AUTO_INCREMENT, unit INTEGER NOT NULL, address INTEGER NOT NULL, @@ -19,98 +19,194 @@ CREATE TABLE tDatapoint ( available BOOLEAN DEFAULT TRUE, retries INTEGER NOT NULL DEFAULT 0, giveUpCount INTEGER NOT NULL DEFAULT 0, - CONSTRAINT uniqueDatapoint UNIQUE (unit, address, count, label) + active BOOLEAN NOT NULL DEFAULT TRUE, + CONSTRAINT uniqueReadDatapoint UNIQUE (unit, address, count, label) ); --- ALTER TABLE tDatapoint MODIFY available BOOLEAN DEFAULT TRUE; --- ALTER TABLE tDatapoint MODIFY lastContact TIMESTAMP(3); --- ALTER TABLE tDatapoint MODIFY scanRate TIME(3) DEFAULT '00:00:01.000'; --- ALTER TABLE tDatapoint ADD giveUpCount INTEGER NOT NULL DEFAULT 0; --- ALTER TABLE tDatapoint MODIFY lastContact TIMESTAMP(3) NOT NULL DEFAULT '1970-01-01 00:00:01.000'; - --- ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'), --- ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'), --- ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'), --- ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'), --- ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'), --- ModbusRequestDefinition(3, 0x0004, 2, 'RF', 'Resistance Channel 1'), --- ModbusRequestDefinition(3, 0x000C, 2, 'RF', 'Temperature Channel 1'), --- ModbusRequestDefinition(3, 0x0014, 2, 'RF', 'Resistance Channel 2'), --- ModbusRequestDefinition(3, 0x001C, 2, 'RF', 'Temperature Channel 2'), - -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(4, 0x2000, 2, 'F', '(ERR) Unavailable device', 'IoT/ModbusMaster1/UnavailableDevice', '00:00:01.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(1, 0x2000, 4, 'F', '(ERR) Wrong register size', 'IoT/ModbusMaster1/WrongRegisterSize', '00:05:00.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(1, 0x2000, 2, 'F', 'Voltage', 'IoT/ModbusMaster1/Voltage', '00:05:00.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(1, 0x2020, 2, 'F', 'Frequency', 'IoT/ModbusMaster1/Frequency', '00:05:00.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(1, 0x2060, 2, 'F', 'Current', 'IoT/ModbusMaster1/Current', '00:05:00.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(3, 0x0004, 2, 'RF', 'Resistance Channel 1', 'IoT/ModbusMaster1/Channel1/Resistance', '00:00:01.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(3, 0x000C, 2, 'RF', 'Temperature Channel 1', 'IoT/ModbusMaster1/Channel1/Temperature', '00:00:01.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(3, 0x0014, 2, 'RF', 'Resistance Channel 2', 'IoT/ModbusMaster1/Channel2/Resistance', '00:00:01.000'); -INSERT INTO tDatapoint (unit, address, count, converter, label, topic, scanRate) +INSERT INTO tReadDatapoint (unit, address, count, converter, label, topic, scanRate) VALUES(3, 0x001C, 2, 'RF', 'Temperature Channel 2', 'IoT/ModbusMaster1/Channel2/Temperature', '00:00:01.000'); -CREATE OR REPLACE VIEW vDatapointsToBeQueried AS - SELECT id, unit, address, count, converter - FROM tDatapoint - WHERE ADDTIME(lastContact, ADDTIME(scanRate, backoff)) < NOW(3) AND - available; +DROP TABLE tWriteDatapoint; - -DROP TABLE tNotification; - -CREATE TABLE tNotification ( +CREATE TABLE tWriteDatapoint ( id INTEGER PRIMARY KEY AUTO_INCREMENT, - datapointId INTEGER NOT NULL REFERENCES tDatapoint(id), - notificationType VARCHAR(1), - CONSTRAINT checkNotificationType CHECK (notificationtype IN ('V', 'F', 'R')) + unit INTEGER NOT NULL, + address INTEGER NOT NULL, + count INTEGER NOT NULL, + converter VARCHAR(10) NOT NULL, + label VARCHAR(128) NOT NULL, + topic VARCHAR(256) NOT NULL, + lastContact TIMESTAMP(3) NOT NULL DEFAULT '2000-01-01 00:00:01.000', + lastError VARCHAR(512), + value VARCHAR(512), + retries INTEGER NOT NULL DEFAULT 0, + active BOOLEAN NOT NULL DEFAULT TRUE, + CONSTRAINT uniqueWriteDatapoint UNIQUE (unit, address, count, label) ); + +INSERT INTO tWriteDatapoint (unit, address, count, converter, label, topic, active) + VALUES(5, 0x0000, 1, 'B', 'Relay 1', 'IoT/ModbusMaster1/Relay1', FALSE); + + +CREATE OR REPLACE VIEW vReadDatapointsToBeHandled AS + SELECT id, unit, address, count, converter + FROM tReadDatapoint + WHERE available AND + active AND + ADDTIME(lastContact, ADDTIME(scanRate, backoff)) < NOW(3) + ORDER BY scanRate; + +CREATE OR REPLACE VIEW vWriteDatapintsToBeHandled AS + SELECT id, unit, address, count, converter, value + FROM tWriteDatapoint + WHERE active; + + + +DROP TABLE tReadNotification; + +CREATE TABLE tReadNotification ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + readDatapointId INTEGER NOT NULL REFERENCES tReadDatapoint(id), + notificationType VARCHAR(1), + CONSTRAINT checkNotificationType CHECK (notificationtype IN ('V', 'F', 'R')) -- value, failure, return +); + + +DROP TABLE tWrittenNotification; + +CREATE TABLE tWrittenNotification ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + writeDatapointId INTEGER NOT NULL REFERENCES tWriteDatapoint(id), + notificationType VARCHAR(1), + CONSTRAINT checkNotificationType CHECK (notificationtype IN ('S', 'F')) -- success, failure +); + + + + DELIMITER $$ -CREATE OR REPLACE TRIGGER trCheckAvailability - BEFORE UPDATE ON tDatapoint FOR EACH ROW +CREATE OR REPLACE PROCEDURE prWriteFeedback (IN p_id INTEGER, IN p_lastError VARCHAR(512)) + MODIFIES SQL DATA BEGIN - IF NEW.retries >= 5 THEN - IF NEW.backoff = '00:00:00.000' THEN - SET NEW.backoff = OLD.scanRate; + DECLARE v_retries INTEGER; + DECLARE v_active BOOLEAN; + + IF p_lastError = '' OR p_lastError IS NULL THEN + UPDATE tWriteDatapoint + SET lastError = NULL, + lastContact = NOW(3), + retries = 0, + active = FALSE + WHERE id = p_id; + INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES (p_id, 'S'); + ELSE + SELECT retries + INTO v_retries + FROM tWriteDatapoint + WHERE id = p_id; + + SET v_retries := v_retries + 1; + + IF v_retries >= 5 THEN + SET v_retries := 0; + SET v_active := FALSE; ELSE - SET NEW.backoff = ADDTIME(OLD.backoff, OLD.backoff); + SET v_active := TRUE; + END IF; + + UPDATE tWriteDatapoint + SET lastError = p_lastError, + retries = v_retries, + active = v_active + WHERE id = p_id; + + IF NOT v_active THEN + INSERT INTO tWrittenNotification (writeDatapointId, notificationType) VALUES(p_id, 'F'); END IF; - SET NEW.retries := 0; - SET NEW.giveUpCount := OLD.giveUpCount + 1; - END IF; - IF NEW.giveUpCount = 10 THEN - SET NEW.available := FALSE; - SET NEW.giveUpCount := 0; - SET NEW.backoff := '00:00:00.000'; END IF; END; $$ DELIMITER ; + + + DELIMITER $$ -CREATE OR REPLACE TRIGGER trNotification - AFTER UPDATE ON tDatapoint FOR EACH ROW +CREATE OR REPLACE PROCEDURE prReadFeedback (IN p_id INTEGER, IN p_lastValue VARCHAR(512), IN p_lastError VARCHAR(512)) + MODIFIES SQL DATA BEGIN - DECLARE v_notificationType VARCHAR(1); - IF (NEW.lastError IS NULL OR NEW.lastError = '') AND (NEW.lastValue IS NOT NULL) THEN - SET v_notificationType := 'V'; - ELSEIF NEW.available AND NOT OLD.available THEN - SET v_notificationType := 'R'; - ELSEIF NOT NEW.available AND OLD.available THEN - SET v_notificationType := 'F'; - END IF; - IF v_notificationType IS NOT NULL THEN - INSERT INTO tNotification (datapointId, notificationType) VALUES(NEW.id, v_notificationType); + DECLARE v_retries INTEGER; + DECLARE v_backoff TIME(3); + DECLARE v_scanRate TIME(3); + DECLARE v_giveUpCount INTEGER; + DECLARE v_available BOOLEAN; + + IF p_lastError = '' OR p_lastError IS NULL THEN + UPDATE tReadDatapoint + SET lastError = NULL, + lastContact = NOW(3), + lastValue = p_lastValue, + retries = 0, + backoff = '00:00:00.000', + giveUpCount = 0 + WHERE id = p_id; + INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'V'); + ELSE + SELECT retries, backoff, scanRate, giveUpCount + INTO v_retries, v_backoff, v_scanRate, v_giveUpCount + FROM tReadDatapoint + WHERE id = p_id; + + SET v_retries := v_retries + 1; + + IF v_retries >= 5 THEN + IF v_backoff = '00:00:00.000' THEN + SET v_backoff = v_scanRate; + ELSE + SET v_backoff = ADDTIME(v_backoff, v_backoff); + END IF; + SET v_retries := 0; + SET v_giveUpCount := v_giveUpCount + 1; + SET v_available := TRUE; + END IF; + IF v_giveUpCount = 10 THEN + SET v_available := FALSE; + SET v_giveUpCount := 0; + SET v_backoff := '00:00:00.000'; + END IF; + + UPDATE tReadDatapoint + SET lastError = p_lastError, + retries = v_retries, + backoff = v_backoff, + giveUpCount = v_giveUpCount, + available = v_available + WHERE id = p_id; + + IF NOT v_available THEN + INSERT INTO tReadNotification (readDatapointId, notificationType) VALUES(p_id, 'F'); + END IF; END IF; END; $$ DELIMITER ; + From 7990567378435e3ea2a06fa44d0f61ab6fe99849 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 21 Jun 2019 18:57:00 +0200 Subject: [PATCH 04/13] mostly pseudo code yet --- snippets/test8.py | 119 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 snippets/test8.py diff --git a/snippets/test8.py b/snippets/test8.py new file mode 100644 index 0000000..da3f39e --- /dev/null +++ b/snippets/test8.py @@ -0,0 +1,119 @@ +import queue +import datetime +import threading + +class AbstractModbusDatapoint(object): + def __init__(self, label, unit, address, count): + self.label = label + self.unit = unit + self.address = address + self.count = count + self.type = 'abstract data point' + + def __str__(self): + return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) + +class HoldingRegisterDatapoint(AbstractModbusDatapoint): + def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): + super(AbstractModbusDatapoint, self).__init__(label, unit, address, count) + self.scanRate = scanRate + self.publishTopic = publishTopic + self.subscribeTopic = subscribeTopic + self.feedbackTopic = feedbackTopic + self.writeRequestValue = None + self.lastContact = 0 + self.type = 'read holding register' + + def process(self): + successFull = False + giveUp = False + if self.writeRequestValue: + # perform write operation + if successFull: + # give feedback + self.writeRequestValue = None + else: + # retries handling + if giveUp: + # give negative feedback + self.writeRequestValue = None + else: + # perform read operation + if successFull: + self.lastContact = datetime.datetime.now() + # publish value + else: + # retries handling + if giveUp: + # backoff and availability handling + # give negative feedback + pass + + def onMessage(self, value): + self.writeRequestValue = value + + +class MqttProcessor(threading.Thread): + def __init__(self, registers, queue): + super(object, self).__init__() + self.registers = registers + self.queue = queue + + def run(self): + pass + # set mqtt callbacks + # mqtt connect + # mqtt loop forever + + def onConnect(self): + pass + # subscribe to all subscribe topics from registers + + def onMessage(self, topic, payload): + pass + # call onMessage method of register with related subscribe topic + # put register yourself in high prio queue + # notify using event + + +class ScanRateProcessingQueueFeeder(threading.Thread): + def __init__(self, registers, queue): + super(threading.Thread, self).__init__() + self.registers = registers + self.queue = queue + + def run(self): + pass + # search registers with expired scanRate (lastContact + scanRate * backoff < now) + # put into low prio queue + + + + + + + + +# ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'), +# ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'), +# ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'), +# ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'), +# ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'), +# ModbusRequestDefinition(3, 0x0004, 2, 'RF', 'Resistance Channel 1'), +# ModbusRequestDefinition(3, 0x000C, 2, 'RF', 'Temperature Channel 1'), +# ModbusRequestDefinition(3, 0x0014, 2, 'RF', 'Resistance Channel 2'), +# ModbusRequestDefinition(3, 0x001C, 2, 'RF', 'Temperature Channel 2'), + + +datapoints = [ + HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, 60.0, 'Pub/Voltage', None, None), + HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, 60.0, 'Pub/Frequency', None, None), + HoldingRegisterDatapoint('Current', 1, 0x2060, 2, 60.0, 'Pub/Current', None, None), + HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, 1.0, 'Pub/ResistanceChannel1', None, None), + HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, 1.0, 'Pub/TemperatureChannel1', None, None), + HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, 1.0, 'Pub/ResistanceChannel2', None, None), + HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, 1.0, 'Pub/TemperatureChannel2', None, None), + HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, 0.0, None, 'Sub/Relay1', 'Feedback/Relay1') +] + + From 4a090c5a73f60126b8480e85f9a58608700e5332 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 22 Jun 2019 00:45:28 +0200 Subject: [PATCH 05/13] priority handling tested --- snippets/test8.py | 85 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index da3f39e..74f6a14 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -3,25 +3,39 @@ import datetime import threading class AbstractModbusDatapoint(object): - def __init__(self, label, unit, address, count): + def __init__(self, label, unit, address, count, scanRate): self.label = label self.unit = unit self.address = address self.count = count + self.scanRate = scanRate self.type = 'abstract data point' + self.enqueued = False + if self.scanRate: + self.priority = 1 + else: + self.priority = 0 + + + + def __lt__(self, other): return self.priority < other.priority + def __le__(self, other): return self.priority <= other.priority + def __eq__(self, other): return self.priority == other.priority + def __ne__(self, other): return self.priority != other.priority + def __gt__(self, other): return self.priority > other.priority + def __ge__(self, other): return self.priority >= other.priority def __str__(self): return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): - super(AbstractModbusDatapoint, self).__init__(label, unit, address, count) - self.scanRate = scanRate + super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) self.publishTopic = publishTopic self.subscribeTopic = subscribeTopic self.feedbackTopic = feedbackTopic self.writeRequestValue = None - self.lastContact = 0 + self.lastContact = None self.type = 'read holding register' def process(self): @@ -55,7 +69,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): class MqttProcessor(threading.Thread): def __init__(self, registers, queue): - super(object, self).__init__() + super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue @@ -73,19 +87,42 @@ class MqttProcessor(threading.Thread): pass # call onMessage method of register with related subscribe topic # put register yourself in high prio queue - # notify using event + # self.put(r) -class ScanRateProcessingQueueFeeder(threading.Thread): +class ScanRateConsideringQueueFeeder(threading.Thread): def __init__(self, registers, queue): - super(threading.Thread, self).__init__() + super(ScanRateConsideringQueueFeeder, self).__init__() self.registers = registers self.queue = queue def run(self): - pass - # search registers with expired scanRate (lastContact + scanRate * backoff < now) - # put into low prio queue + while True: + registersToBeHandled = [ + r for r in self.registers if ((not r.enqueued) and + (r.scanRate) and + ((not r.lastContact) or + (r.lastContact + r.scanRate < datetime.datetime.now()))) + ] + registersToBeHandled.sort(key=lambda x : x.scanRate) + for r in registersToBeHandled: + self.queue.put(r) + r.enqueued = True + + +class CommunicationProcessor(threading.Thread): + def __init__(self, queue): + super(CommunicationProcessor, self).__init__() + self.queue = queue + + def run(self): + while True: + r = self.queue.get() + # r.process() + r.lastContact = datetime.datetime.now() + print("Dequeued: {0!s}".format(r)) + r.enqueued = False + @@ -106,14 +143,24 @@ class ScanRateProcessingQueueFeeder(threading.Thread): datapoints = [ - HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, 60.0, 'Pub/Voltage', None, None), - HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, 60.0, 'Pub/Frequency', None, None), - HoldingRegisterDatapoint('Current', 1, 0x2060, 2, 60.0, 'Pub/Current', None, None), - HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, 1.0, 'Pub/ResistanceChannel1', None, None), - HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, 1.0, 'Pub/TemperatureChannel1', None, None), - HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, 1.0, 'Pub/ResistanceChannel2', None, None), - HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, 1.0, 'Pub/TemperatureChannel2', None, None), - HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, 0.0, None, 'Sub/Relay1', 'Feedback/Relay1') + HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), + HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None), + HoldingRegisterDatapoint('Current', 1, 0x2060, 2, datetime.timedelta(seconds=10), 'Pub/Current', None, None), + HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel1', None, None), + HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel1', None, None), + HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel2', None, None), + HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel2', None, None), + HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] +queue = queue.PriorityQueue() + + + +if __name__ == "__main__": + cp = CommunicationProcessor(queue) + cp.start() + + qf = ScanRateConsideringQueueFeeder(datapoints, queue) + qf.start() From 37548cfd53947da5d1f7715d818a39d8a18a3679 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 24 Jun 2019 13:17:09 +0200 Subject: [PATCH 06/13] be more cooperative, delay queue feeder by minimum scan rate in register list --- snippets/test8.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/snippets/test8.py b/snippets/test8.py index 74f6a14..4ee0d2a 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -95,6 +95,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread): super(ScanRateConsideringQueueFeeder, self).__init__() self.registers = registers self.queue = queue + self.delayEvent = threading.Event() def run(self): while True: @@ -108,6 +109,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread): for r in registersToBeHandled: self.queue.put(r) r.enqueued = True + self.delayEvent.wait(min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])) class CommunicationProcessor(threading.Thread): From 7674aac137910f3375454c2b916a12f41cb9028a Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 24 Jun 2019 16:03:57 +0200 Subject: [PATCH 07/13] cmd handler --- snippets/test8.py | 57 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index 4ee0d2a..f9f7391 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -1,6 +1,9 @@ import queue import datetime import threading +import socketserver +import cmd +import io class AbstractModbusDatapoint(object): def __init__(self, label, unit, address, count, scanRate): @@ -73,6 +76,10 @@ class MqttProcessor(threading.Thread): self.registers = registers self.queue = queue + def registersChanged(self): + pass + # subscribe and/or unsubscribe according to register changes + def run(self): pass # set mqtt callbacks @@ -97,7 +104,14 @@ class ScanRateConsideringQueueFeeder(threading.Thread): self.queue = queue self.delayEvent = threading.Event() + def getMinimalScanrate(self): + return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) + + def registersChanged(self): + self.delay = self.getMinimalScanrate() + def run(self): + self.delay = self.getMinimalScanrate() while True: registersToBeHandled = [ r for r in self.registers if ((not r.enqueued) and @@ -109,7 +123,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread): for r in registersToBeHandled: self.queue.put(r) r.enqueued = True - self.delayEvent.wait(min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])) + self.delayEvent.wait(self.delay) class CommunicationProcessor(threading.Thread): @@ -127,6 +141,36 @@ class CommunicationProcessor(threading.Thread): +class CmdInterpreter(cmd.Cmd): + def __init__(self, infile, outfile): + super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile) + self.use_rawinput = False + + def do_test(self, arg): + self.stdout.write("This is the test response") + + def do_bye(self, arg): + self.stdout.write("Bye!") + return True + + +class CmdHandle(socketserver.StreamRequestHandler): + def handle(self): + print("About to handle cmd session") + cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) + cmd.cmdloop() + + def finish(self): + super(CmdHandle, self).finish() + print("END") + +class CmdServer(object): + def __init__(self, address, port): + self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle) + self.serverThread = threading.Thread(target=self.server.serve_forever()) + + def start(self): + self.serverThread.start() @@ -161,8 +205,11 @@ queue = queue.PriorityQueue() if __name__ == "__main__": - cp = CommunicationProcessor(queue) - cp.start() + #cp = CommunicationProcessor(queue) + #cp.start() - qf = ScanRateConsideringQueueFeeder(datapoints, queue) - qf.start() + #qf = ScanRateConsideringQueueFeeder(datapoints, queue) + #qf.start() + + cs = CmdServer('0.0.0.0',9999) + cs.start() \ No newline at end of file From 44f82937d3c701eda08679dd2591a57bcae1c88b Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 24 Jun 2019 17:01:10 +0200 Subject: [PATCH 08/13] cmd handle working so far --- snippets/test8.py | 80 +++++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index f9f7391..632a4e3 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -5,6 +5,24 @@ import socketserver import cmd import io + + +class AbstractNotificationReceiver(object): + def receiveNotification(self, arg): + raise NotImplementedError + +class NotificationForwarder(object): + def __init__(self): + self.receivers = [] + + def register(self, receiver): + self.receivers.append(receiver) + + def notify(self, arg=None): + for r in self.receivers: + r.receiveNotification(arg) + + class AbstractModbusDatapoint(object): def __init__(self, label, unit, address, count, scanRate): self.label = label @@ -19,8 +37,6 @@ class AbstractModbusDatapoint(object): else: self.priority = 0 - - def __lt__(self, other): return self.priority < other.priority def __le__(self, other): return self.priority <= other.priority def __eq__(self, other): return self.priority == other.priority @@ -31,6 +47,7 @@ class AbstractModbusDatapoint(object): def __str__(self): return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) + class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) @@ -70,13 +87,14 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.writeRequestValue = value -class MqttProcessor(threading.Thread): +class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue - def registersChanged(self): + def receiveNotification(self, arg): + print("MqttProcessor:registersChanged") pass # subscribe and/or unsubscribe according to register changes @@ -97,7 +115,7 @@ class MqttProcessor(threading.Thread): # self.put(r) -class ScanRateConsideringQueueFeeder(threading.Thread): +class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(ScanRateConsideringQueueFeeder, self).__init__() self.registers = registers @@ -107,7 +125,8 @@ class ScanRateConsideringQueueFeeder(threading.Thread): def getMinimalScanrate(self): return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) - def registersChanged(self): + def receiveNotification(self, arg): + print("ScanRateConsideringQueueFeeder:registersChanged") self.delay = self.getMinimalScanrate() def run(self): @@ -140,7 +159,6 @@ class CommunicationProcessor(threading.Thread): r.enqueued = False - class CmdInterpreter(cmd.Cmd): def __init__(self, infile, outfile): super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile) @@ -149,6 +167,9 @@ class CmdInterpreter(cmd.Cmd): def do_test(self, arg): self.stdout.write("This is the test response") + def do_notify(self, arg): + nf.notify() + def do_bye(self, arg): self.stdout.write("Bye!") return True @@ -156,38 +177,25 @@ class CmdInterpreter(cmd.Cmd): class CmdHandle(socketserver.StreamRequestHandler): def handle(self): - print("About to handle cmd session") cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) - cmd.cmdloop() + try: + cmd.cmdloop() + except ConnectionAbortedError as e: + print("Cmd handle externally interrupted") - def finish(self): - super(CmdHandle, self).finish() - print("END") -class CmdServer(object): +class CmdServer(threading.Thread): def __init__(self, address, port): + super(CmdServer, self).__init__() self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle) - self.serverThread = threading.Thread(target=self.server.serve_forever()) def start(self): - self.serverThread.start() + self.server.serve_forever() - -# ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'), -# ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'), -# ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'), -# ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'), -# ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'), -# ModbusRequestDefinition(3, 0x0004, 2, 'RF', 'Resistance Channel 1'), -# ModbusRequestDefinition(3, 0x000C, 2, 'RF', 'Temperature Channel 1'), -# ModbusRequestDefinition(3, 0x0014, 2, 'RF', 'Resistance Channel 2'), -# ModbusRequestDefinition(3, 0x001C, 2, 'RF', 'Temperature Channel 2'), - - datapoints = [ HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None), @@ -199,17 +207,21 @@ datapoints = [ HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] - queue = queue.PriorityQueue() - +nf = NotificationForwarder() if __name__ == "__main__": - #cp = CommunicationProcessor(queue) - #cp.start() + cp = CommunicationProcessor(queue) + cp.start() - #qf = ScanRateConsideringQueueFeeder(datapoints, queue) - #qf.start() + mp = MqttProcessor(datapoints, queue) + nf.register(mp) + mp.start() - cs = CmdServer('0.0.0.0',9999) + qf = ScanRateConsideringQueueFeeder(datapoints, queue) + nf.register(qf) + qf.start() + + cs = CmdServer('127.0.0.1',9999) cs.start() \ No newline at end of file From c30acfabdb41c37faa0534fc9416d6f0e0af102e Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 24 Jun 2019 17:15:47 +0200 Subject: [PATCH 09/13] still working --- snippets/test8.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index 632a4e3..4b57a1a 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -165,13 +165,13 @@ class CmdInterpreter(cmd.Cmd): self.use_rawinput = False def do_test(self, arg): - self.stdout.write("This is the test response") + self.stdout.write("This is the test response\n\r") def do_notify(self, arg): nf.notify() def do_bye(self, arg): - self.stdout.write("Bye!") + self.stdout.write("Bye!\n\r") return True @@ -180,6 +180,7 @@ class CmdHandle(socketserver.StreamRequestHandler): cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) try: cmd.cmdloop() + print("Cmd handle terminated") except ConnectionAbortedError as e: print("Cmd handle externally interrupted") From a2a5a924bdd983c729b65b567aa61adf64478188 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Tue, 25 Jun 2019 17:26:23 +0200 Subject: [PATCH 10/13] mqtt handling --- snippets/test8.py | 86 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 14 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index 4b57a1a..a77e984 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -4,6 +4,15 @@ import threading import socketserver import cmd import io +import paho.mqtt.client as mqtt + + +class Config(object): + def __init__(self): + self.mqttBrokerHost = '127.0.0.1' + self.mqttBrokerPort = 1883 + self.mqttLogin = '' + self.mqttPassword = '' @@ -31,6 +40,8 @@ class AbstractModbusDatapoint(object): self.count = count self.scanRate = scanRate self.type = 'abstract data point' + self.command = None + self.value = None self.enqueued = False if self.scanRate: self.priority = 1 @@ -45,7 +56,13 @@ class AbstractModbusDatapoint(object): def __ge__(self, other): return self.priority >= other.priority def __str__(self): - return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) + return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) + + def setCommand(self, cmd): + self.command = cmd + + def setValue(self, value): + self.value = value class HoldingRegisterDatapoint(AbstractModbusDatapoint): @@ -87,32 +104,72 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.writeRequestValue = value +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue + self.client = mqtt.Client(userdata=self) + self.subscriptions = [] + self.topicRegisterMap ={} + + def __processUpdatedRegisters(self, force=False): + # print("MqttProcessor.__updateSubscriptions") + + subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] + # print("Topics: {0!s}".format(subscribeTopics)) + + for subscribeTopic in subscribeTopics: + if (subscribeTopic not in self.subscriptions) or force: + print("Subscribe to {0}".format(subscribeTopic)) + self.client.subscribe(subscribeTopic) + self.subscriptions.append(subscribeTopic) + + for subscription in self.subscriptions: + if (subscription not in subscribeTopics) and not force: + print("Unsubscribe from {0}".format(subscription)) + self.client.unsubscribe(subscription) + self.subscriptions.remove(subscription) + + self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } def receiveNotification(self, arg): print("MqttProcessor:registersChanged") - pass - # subscribe and/or unsubscribe according to register changes + self.__processUpdatedRegisters() def run(self): - pass - # set mqtt callbacks - # mqtt connect - # mqtt loop forever + # print("MqttProcessor.run") + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + if config.mqttLogin and config.mqttPassword: + self.client.username_pw_set(config.mqttLogin, config.mqttPassword) + self.client.connect(config.mqttBrokerHost, config.mqttBrokerPort) + self.client.loop_forever() def onConnect(self): - pass - # subscribe to all subscribe topics from registers + # print("MqttProcessor.onConnect") + self.__processUpdatedRegisters(force=True) + + def onDisconnect(self, rc): + print("Disconnected from MQTT broker: {0}".format(rc)) def onMessage(self, topic, payload): - pass - # call onMessage method of register with related subscribe topic - # put register yourself in high prio queue - # self.put(r) + # print("MqttProcessor.onMessage") + r = self.topicRegisterMap[topic] + # print("{0}: {1!s} -> {2!s}".format(topic, payload, r)) + r.setCommand('w') + r.setValue(payload) + self.queue.put(r) class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): @@ -140,6 +197,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei ] registersToBeHandled.sort(key=lambda x : x.scanRate) for r in registersToBeHandled: + r.setCommand('r') self.queue.put(r) r.enqueued = True self.delayEvent.wait(self.delay) @@ -210,7 +268,7 @@ datapoints = [ queue = queue.PriorityQueue() nf = NotificationForwarder() - +config = Config() if __name__ == "__main__": cp = CommunicationProcessor(queue) From 54d33007fcaa76ec622b78fc3746a4222e6d8ff5 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 26 Jun 2019 16:46:41 +0200 Subject: [PATCH 11/13] priority handling fixed, list and del command in admin intf --- snippets/test8.py | 111 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 85 insertions(+), 26 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index a77e984..c8e02a3 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -48,13 +48,6 @@ class AbstractModbusDatapoint(object): else: self.priority = 0 - def __lt__(self, other): return self.priority < other.priority - def __le__(self, other): return self.priority <= other.priority - def __eq__(self, other): return self.priority == other.priority - def __ne__(self, other): return self.priority != other.priority - def __gt__(self, other): return self.priority > other.priority - def __ge__(self, other): return self.priority >= other.priority - def __str__(self): return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) @@ -67,7 +60,7 @@ class AbstractModbusDatapoint(object): class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): - super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) + super().__init__(label, unit, address, count, scanRate) self.publishTopic = publishTopic self.subscribeTopic = subscribeTopic self.feedbackTopic = feedbackTopic @@ -75,6 +68,9 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.lastContact = None self.type = 'read holding register' + def __str__(self): + return "[{0!s}, {1} {2} {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic) + def process(self): successFull = False giveUp = False @@ -115,7 +111,7 @@ def mqttOnDisconnectCallback(client, userdata, rc): class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): - super(MqttProcessor, self).__init__() + super().__init__() self.registers = registers self.queue = queue self.client = mqtt.Client(userdata=self) @@ -174,7 +170,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): - super(ScanRateConsideringQueueFeeder, self).__init__() + super().__init__() self.registers = registers self.queue = queue self.delayEvent = threading.Event() @@ -205,7 +201,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei class CommunicationProcessor(threading.Thread): def __init__(self, queue): - super(CommunicationProcessor, self).__init__() + super().__init__() self.queue = queue def run(self): @@ -217,36 +213,99 @@ class CommunicationProcessor(threading.Thread): r.enqueued = False -class CmdInterpreter(cmd.Cmd): - def __init__(self, infile, outfile): - super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile) - self.use_rawinput = False +class MyPriorityQueueItem(object): + def __init__(self, itemWithPriority): + self.itemWithPriority = itemWithPriority - def do_test(self, arg): - self.stdout.write("This is the test response\n\r") + def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority + def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority + def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority + def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority + def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority + def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority + +class MyPriorityQueue(queue.PriorityQueue): + def _put(self, itemWithPriority): + i = MyPriorityQueueItem(itemWithPriority) + super()._put(i) + + def _get(self): + i = super()._get() + return i.itemWithPriority + +class CmdInterpreter(cmd.Cmd): + def __init__(self, infile, outfile, notifier, registers): + super().__init__(stdin=infile, stdout=outfile) + self.use_rawinput = False + self.notifier = notifier + self.registers = registers + self.prompt = "test8> " + self.intro = "test8 admin interface" + + def __print(self, text): + self.stdout.write(text) + + def __println(self, text): + self.stdout.write(text) + self.stdout.write("\n\r") def do_notify(self, arg): - nf.notify() + self.notifier.notify() - def do_bye(self, arg): - self.stdout.write("Bye!\n\r") + def help_notify(self): + self.__println("Notifies threads using the list of datapoints about changes in this list.") + self.__println("Call after modifications on the list.") + + def do_quit(self, arg): + self.__println("Bye!") return True + + def do_list(self, arg): + for i, r in enumerate(self.registers): + self.__println("#{0}: {1!s}".format(i, r)) + + def help_list(self): + self.__println("Usage: list") + self.__println("List the configured datapoints") + + def do_del(self, arg): + try: + i = int(arg) + r = self.registers[i] + self.registers.remove(r) + self.__println("{0!s} removed".format(r)) + except ValueError as e: + self.__println("ERROR: {0!s}".format(e)) + + def help_del(self): + self.__println("Usage: del ") + self.__println("Removes an item from the list of datapoints by its index, see list command.") + self.__println("Be aware: indexes have been changed, rerun list before removing the next item.") class CmdHandle(socketserver.StreamRequestHandler): def handle(self): - cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) + cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.notifier, self.server.userData.registers) try: cmd.cmdloop() print("Cmd handle terminated") except ConnectionAbortedError as e: print("Cmd handle externally interrupted") +class MyThreadingTCPServer(socketserver.ThreadingTCPServer): + def __init__(self, host, handler, userData): + super().__init__(host, handler) + self.userData = userData + +class MyCmdUserData(object): + def __init__(self, notifier, registers): + self.notifier = notifier + self.registers = registers class CmdServer(threading.Thread): - def __init__(self, address, port): - super(CmdServer, self).__init__() - self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle) + def __init__(self, address, port, notifier, registers): + super().__init__() + self.server = MyThreadingTCPServer((address, port), CmdHandle, MyCmdUserData(notifier, registers)) def start(self): self.server.serve_forever() @@ -266,7 +325,7 @@ datapoints = [ HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] -queue = queue.PriorityQueue() +queue = MyPriorityQueue() nf = NotificationForwarder() config = Config() @@ -282,5 +341,5 @@ if __name__ == "__main__": nf.register(qf) qf.start() - cs = CmdServer('127.0.0.1',9999) + cs = CmdServer('127.0.0.1',9999, nf, datapoints) cs.start() \ No newline at end of file From 8d6d8e5901791e2a25b1d99f6f59773c3f1c9f30 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 27 Jun 2019 12:13:27 +0200 Subject: [PATCH 12/13] add command implemented --- snippets/test8.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/snippets/test8.py b/snippets/test8.py index c8e02a3..8cdaa15 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -5,6 +5,7 @@ import socketserver import cmd import io import paho.mqtt.client as mqtt +import re class Config(object): @@ -233,6 +234,18 @@ class MyPriorityQueue(queue.PriorityQueue): i = super()._get() return i.itemWithPriority +class CmdInterpreterException(ValueError): pass + +def parseIntArbitraryBase(s): + i = 0 + if s.startswith('0x'): + i = int(s, 16) + elif s.startswith('0b'): + i = int(s, 2) + else: + i = int(s, 10) + return i + class CmdInterpreter(cmd.Cmd): def __init__(self, infile, outfile, notifier, registers): super().__init__(stdin=infile, stdout=outfile) @@ -241,6 +254,7 @@ class CmdInterpreter(cmd.Cmd): self.registers = registers self.prompt = "test8> " self.intro = "test8 admin interface" + self.splitterRe = re.compile('\s+') def __print(self, text): self.stdout.write(text) @@ -260,12 +274,79 @@ class CmdInterpreter(cmd.Cmd): self.__println("Bye!") return True + def do_add(self, arg): + try: + (registerType, label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic) = self.splitterRe.split(arg) + self.__println("RegisterType: {0}".format(registerType)) + self.__println("Label: {0}".format(label)) + self.__println("Unit: {0}".format(unit)) + self.__println("Address: {0}".format(address)) + self.__println("Count: {0}".format(count)) + self.__println("ScanRate: {0}".format(scanrate)) + self.__println("ReadTopic: {0}".format(readTopic)) + self.__println("WriteTopic: {0}".format(writeTopic)) + self.__println("FeedbackTopic: {0}".format(feedbackTopic)) + + if readTopic == 'None': + readTopic = None + if writeTopic == 'None': + writeTopic = None + if feedbackTopic == 'None': + feedbackTopic = None + unit = parseIntArbitraryBase(unit) + address = parseIntArbitraryBase(address) + count = parseIntArbitraryBase(count) + scanrate = float(scanrate) + if scanrate == 0: + if readTopic: + raise CmdInterpreterException('readTopic must not be set when scanRate is zero') + if not writeTopic: + raise CmdInterpreterException('writeTopic must be set when scanRate is zero') + if not feedbackTopic: + raise CmdInterpreterException('feedbackTopic must be set when scanRate is zero') + else: + if not readTopic: + raise CmdInterpreterException('readTopic must be set when scanRate is zero') + if writeTopic: + raise CmdInterpreterException('writeTopic must not be set when scanRate is zero') + if feedbackTopic: + raise CmdInterpreterException('feedbackTopic must not be set when scanRate is zero') + if registerType not in ['HoldingRegister']: + raise CmdInterpreterException('Unknown register type {0}'.format(registerType)) + + + except ValueError as e: + self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) + + def help_add(self): + # HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), + self.__println("Usage: add