satisfy pycodestyle checker
This commit is contained in:
@ -1,12 +1,14 @@
|
|||||||
import re
|
import re
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
class InvalidDataObjectException(Exception):
|
class InvalidDataObjectException(Exception):
|
||||||
def __init__(self, message):
|
def __init__(self, message):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
|
|
||||||
class AbstractDataObject(object):
|
class AbstractDataObject(object):
|
||||||
invalidChars = re.compile("[#+\s]")
|
invalidChars = re.compile(r'[#+\s]')
|
||||||
|
|
||||||
def __init__(self, topicPart):
|
def __init__(self, topicPart):
|
||||||
self.topicPart = topicPart
|
self.topicPart = topicPart
|
||||||
@ -17,4 +19,4 @@ class AbstractDataObject(object):
|
|||||||
return self.topicPart
|
return self.topicPart
|
||||||
|
|
||||||
def getPayload(self):
|
def getPayload(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
@ -4,14 +4,13 @@ from AbstractDataObject import AbstractDataObject
|
|||||||
|
|
||||||
|
|
||||||
class FlatDataObject(AbstractDataObject):
|
class FlatDataObject(AbstractDataObject):
|
||||||
|
|
||||||
def __init__(self, serverName, nameSpaceIndex, variableName, value):
|
def __init__(self, serverName, nameSpaceIndex, variableName, value):
|
||||||
super().__init__(serverName + '/' + str(nameSpaceIndex) + '/' + variableName)
|
super().__init__(serverName + '/' + str(nameSpaceIndex) + '/' + variableName)
|
||||||
self.serverName = serverName
|
self.serverName = serverName
|
||||||
self.nameSpaceIndex = nameSpaceIndex
|
self.nameSpaceIndex = nameSpaceIndex
|
||||||
self.variableName = variableName
|
self.variableName = variableName
|
||||||
self.value = value
|
self.value = value
|
||||||
|
|
||||||
def getPayload(self):
|
def getPayload(self):
|
||||||
payload = {
|
payload = {
|
||||||
"serverName": self.serverName,
|
"serverName": self.serverName,
|
||||||
@ -19,4 +18,4 @@ class FlatDataObject(AbstractDataObject):
|
|||||||
"variableName": self.variableName,
|
"variableName": self.variableName,
|
||||||
"value": self.value
|
"value": self.value
|
||||||
}
|
}
|
||||||
return json.dumps(payload)
|
return json.dumps(payload)
|
||||||
|
@ -4,13 +4,14 @@ import threading
|
|||||||
import ssl
|
import ssl
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def mqttOnConnectCallback(client, userdata, flags, rc):
|
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||||
userdata.onConnect()
|
userdata.onConnect()
|
||||||
|
|
||||||
|
|
||||||
def mqttOnMessageCallback(client, userdata, message):
|
def mqttOnMessageCallback(client, userdata, message):
|
||||||
userdata.onMessage(message.topic, message.payload)
|
userdata.onMessage(message.topic, message.payload)
|
||||||
|
|
||||||
|
|
||||||
def mqttOnDisconnectCallback(client, userdata, rc):
|
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||||
userdata.onDisconnect(rc)
|
userdata.onDisconnect(rc)
|
||||||
|
|
||||||
@ -34,38 +35,37 @@ class AbstractMqttPublisher(threading.Thread):
|
|||||||
|
|
||||||
if ("login" in self.config) and ("password" in self.config):
|
if ("login" in self.config) and ("password" in self.config):
|
||||||
self.client.username_pw_set(self.config["login"], self.config["password"])
|
self.client.username_pw_set(self.config["login"], self.config["password"])
|
||||||
|
|
||||||
if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config):
|
if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config):
|
||||||
self.client.tls_set(
|
self.client.tls_set(
|
||||||
ca_certs=self.config["ca"],
|
ca_certs=self.config["ca"],
|
||||||
certfile=self.config["cert"],
|
certfile=self.config["cert"],
|
||||||
keyfile=self.config["key"],
|
keyfile=self.config["key"],
|
||||||
cert_reqs=ssl.CERT_REQUIRED,
|
cert_reqs=ssl.CERT_REQUIRED,
|
||||||
tls_version=ssl.PROTOCOL_TLSv1_2,
|
tls_version=ssl.PROTOCOL_TLSv1_2,
|
||||||
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
|
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
|
||||||
)
|
)
|
||||||
elif ("ca" in self.config):
|
elif ("ca" in self.config):
|
||||||
self.client.tls_set(
|
self.client.tls_set(
|
||||||
ca_certs=self.config["ca"],
|
ca_certs=self.config["ca"],
|
||||||
cert_reqs=ssl.CERT_REQUIRED,
|
cert_reqs=ssl.CERT_REQUIRED,
|
||||||
tls_version=ssl.PROTOCOL_TLSv1_2,
|
tls_version=ssl.PROTOCOL_TLSv1_2,
|
||||||
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
|
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
self.client.connect(self.config["broker"], int(self.config["port"]))
|
self.client.connect(self.config["broker"], int(self.config["port"]))
|
||||||
self.client.loop_start()
|
self.client.loop_start()
|
||||||
logger.info("mqtt loop started")
|
logger.info("mqtt loop started")
|
||||||
|
|
||||||
self.localLoop()
|
self.localLoop()
|
||||||
|
|
||||||
def localLoop(self):
|
def localLoop(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.client.loop_stop()
|
self.client.loop_stop()
|
||||||
logger.info("mqtt loop stopped")
|
logger.info("mqtt loop stopped")
|
||||||
|
|
||||||
self.killBill = True
|
self.killBill = True
|
||||||
logger.info("kill flag set")
|
logger.info("kill flag set")
|
||||||
|
|
||||||
@ -74,10 +74,9 @@ class AbstractMqttPublisher(threading.Thread):
|
|||||||
|
|
||||||
def onConnect(self):
|
def onConnect(self):
|
||||||
logger.info("mqtt connected")
|
logger.info("mqtt connected")
|
||||||
|
|
||||||
def onDisconnect(self, rc):
|
def onDisconnect(self, rc):
|
||||||
logger.warning("mqtt disconnect, rc: {}".format(rc))
|
logger.warning("mqtt disconnect, rc: {}".format(rc))
|
||||||
|
|
||||||
def onMessage(self, topic, payload):
|
def onMessage(self, topic, payload):
|
||||||
logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload)))
|
logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload)))
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import json
|
|||||||
|
|
||||||
LOOP_SLICE = 0.1 # seconds
|
LOOP_SLICE = 0.1 # seconds
|
||||||
|
|
||||||
|
|
||||||
class MqttPublish(AbstractMqttPublisher):
|
class MqttPublish(AbstractMqttPublisher):
|
||||||
def __init__(self, config, stats, queue):
|
def __init__(self, config, stats, queue):
|
||||||
super().__init__(config)
|
super().__init__(config)
|
||||||
@ -32,4 +33,3 @@ class MqttPublish(AbstractMqttPublisher):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.stats.incMqttErrors()
|
self.stats.incMqttErrors()
|
||||||
logger.error(f"Exception {type(e)} received in MQTT local loop: {e}")
|
logger.error(f"Exception {type(e)} received in MQTT local loop: {e}")
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ from loguru import logger
|
|||||||
from FlatDataObject import FlatDataObject
|
from FlatDataObject import FlatDataObject
|
||||||
from StructuredDataObject import StructuredDataObject
|
from StructuredDataObject import StructuredDataObject
|
||||||
|
|
||||||
|
|
||||||
class OpcUaRequester(threading.Thread):
|
class OpcUaRequester(threading.Thread):
|
||||||
def __init__(self, config, stats, queue):
|
def __init__(self, config, stats, queue):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -25,7 +26,6 @@ class OpcUaRequester(threading.Thread):
|
|||||||
self.killBill = False
|
self.killBill = False
|
||||||
self.killEvent = threading.Event()
|
self.killEvent = threading.Event()
|
||||||
|
|
||||||
|
|
||||||
async def opcUaRequesterInnerLoop(self):
|
async def opcUaRequesterInnerLoop(self):
|
||||||
while not self.killBill:
|
while not self.killBill:
|
||||||
try:
|
try:
|
||||||
@ -69,4 +69,3 @@ class OpcUaRequester(threading.Thread):
|
|||||||
|
|
||||||
self.killEvent.set()
|
self.killEvent.set()
|
||||||
logger.info("kill events triggered")
|
logger.info("kill events triggered")
|
||||||
|
|
||||||
|
@ -4,11 +4,12 @@ from AbstractDataObject import AbstractDataObject
|
|||||||
import json
|
import json
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
class StatisticsDataObject(AbstractDataObject):
|
class StatisticsDataObject(AbstractDataObject):
|
||||||
def __init__(self, topic, payload):
|
def __init__(self, topic, payload):
|
||||||
super().__init__(topic)
|
super().__init__(topic)
|
||||||
self.payload = payload
|
self.payload = payload
|
||||||
|
|
||||||
def getPayload(self):
|
def getPayload(self):
|
||||||
return json.dumps(self.payload)
|
return json.dumps(self.payload)
|
||||||
|
|
||||||
@ -27,7 +28,7 @@ class StatisticsCollector(threading.Thread):
|
|||||||
|
|
||||||
self.stats = {
|
self.stats = {
|
||||||
'opcUaRequests': 0,
|
'opcUaRequests': 0,
|
||||||
'opcUaErrors' : 0,
|
'opcUaErrors': 0,
|
||||||
'opcUaTimeouts': 0,
|
'opcUaTimeouts': 0,
|
||||||
'mqttRequests': 0,
|
'mqttRequests': 0,
|
||||||
'mqttErrors': 0,
|
'mqttErrors': 0,
|
||||||
@ -36,10 +37,10 @@ class StatisticsCollector(threading.Thread):
|
|||||||
|
|
||||||
def incOpcUaRequests(self):
|
def incOpcUaRequests(self):
|
||||||
self.stats['opcUaRequests'] += 1
|
self.stats['opcUaRequests'] += 1
|
||||||
|
|
||||||
def incOpcUaErrors(self):
|
def incOpcUaErrors(self):
|
||||||
self.stats['opcUaErrors'] += 1
|
self.stats['opcUaErrors'] += 1
|
||||||
|
|
||||||
def incOpcUaTimeouts(self):
|
def incOpcUaTimeouts(self):
|
||||||
self.stats['opcUaTimeouts'] += 1
|
self.stats['opcUaTimeouts'] += 1
|
||||||
|
|
||||||
@ -63,4 +64,3 @@ class StatisticsCollector(threading.Thread):
|
|||||||
self.stats['uptime'] = int((currentTime - startTime).total_seconds())
|
self.stats['uptime'] = int((currentTime - startTime).total_seconds())
|
||||||
self.queue.put(StatisticsDataObject(self.topic, self.stats))
|
self.queue.put(StatisticsDataObject(self.topic, self.stats))
|
||||||
self.killEvent.wait(timeout=float(self.period))
|
self.killEvent.wait(timeout=float(self.period))
|
||||||
|
|
||||||
|
@ -4,13 +4,12 @@ from AbstractDataObject import AbstractDataObject
|
|||||||
|
|
||||||
|
|
||||||
class StructuredDataObject(AbstractDataObject):
|
class StructuredDataObject(AbstractDataObject):
|
||||||
|
|
||||||
def __init__(self, topicPart):
|
def __init__(self, topicPart):
|
||||||
super().__init__(topicPart)
|
super().__init__(topicPart)
|
||||||
self.keyValuePairs = []
|
self.keyValuePairs = []
|
||||||
|
|
||||||
def add(self, key, value):
|
def add(self, key, value):
|
||||||
self.keyValuePairs.append({key: value})
|
self.keyValuePairs.append({key: value})
|
||||||
|
|
||||||
def getPayload(self):
|
def getPayload(self):
|
||||||
return json.dumps(self.keyValuePairs)
|
return json.dumps(self.keyValuePairs)
|
||||||
|
@ -12,12 +12,14 @@ import signal
|
|||||||
|
|
||||||
deathBell = threading.Event()
|
deathBell = threading.Event()
|
||||||
|
|
||||||
|
|
||||||
def exceptHook(args):
|
def exceptHook(args):
|
||||||
global deathBell
|
global deathBell
|
||||||
logger.error("Exception in thread caught: {}".format(args))
|
logger.error("Exception in thread caught: {}".format(args))
|
||||||
deathBell.set()
|
deathBell.set()
|
||||||
logger.error("rang the death bell")
|
logger.error("rang the death bell")
|
||||||
|
|
||||||
|
|
||||||
def terminateHook(sig, frame):
|
def terminateHook(sig, frame):
|
||||||
global deathBell
|
global deathBell
|
||||||
logger.error("SIGINT received")
|
logger.error("SIGINT received")
|
||||||
|
Reference in New Issue
Block a user