initial
This commit is contained in:
commit
aebbb53090
17
.project
Normal file
17
.project
Normal file
@ -0,0 +1,17 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>MeterbusLib</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
<buildSpec>
|
||||
<buildCommand>
|
||||
<name>org.python.pydev.PyDevBuilder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
</buildSpec>
|
||||
<natures>
|
||||
<nature>org.python.pydev.pythonNature</nature>
|
||||
</natures>
|
||||
</projectDescription>
|
8
.pydevproject
Normal file
8
.pydevproject
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
<path>/${PROJECT_DIR_NAME}</path>
|
||||
</pydev_pathproperty>
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
|
||||
</pydev_project>
|
154
MeterbusLib.py
Normal file
154
MeterbusLib.py
Normal file
@ -0,0 +1,154 @@
|
||||
'''
|
||||
Created on 11.06.2015
|
||||
|
||||
@author: wn
|
||||
'''
|
||||
|
||||
import MeterbusTypeConversion
|
||||
|
||||
class MeterbusLibException(Exception):
|
||||
pass
|
||||
|
||||
class InvalidFrameCodeException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidFrameException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidChecksumException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidStopCharException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidStartCharException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidLengthException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class InvalidSecondLengthException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class PayloadTooShortException(MeterbusLibException):
|
||||
pass
|
||||
|
||||
class Frame(object):
|
||||
def __init__(self, startCharacter, frameLength, firstPayload, telegram):
|
||||
self.telegram = telegram
|
||||
self.frameName = self.__class__.__name__
|
||||
self.frameLength = frameLength
|
||||
self.payloadLength = self.frameLength - 6
|
||||
self.frameStartCharacter = startCharacter
|
||||
self.firstPayload = firstPayload
|
||||
|
||||
def parse(self):
|
||||
if self.telegram[0] != self.frameStartCharacter:
|
||||
raise InvalidStartCharException()
|
||||
if len(self.telegram) != self.frameLength:
|
||||
raise InvalidLengthException()
|
||||
self.verifyChecksumAndEnd()
|
||||
self.parse2()
|
||||
|
||||
def verifyChecksumAndEnd(self):
|
||||
if (self.firstPayload != 0):
|
||||
checksum = sum(self.telegram[self.firstPayload:-2]) & 0xff
|
||||
if checksum != self.telegram[-2]:
|
||||
raise InvalidChecksumException()
|
||||
if self.telegram[-1] != 0x16:
|
||||
raise InvalidStopCharException()
|
||||
|
||||
def parse2(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def getJSON(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class SingleCharacter(Frame):
|
||||
def __init__(self, telegram):
|
||||
super(SingleCharacter, self).__init__(0xe5, 1, 0, telegram)
|
||||
|
||||
def parse2(self):
|
||||
pass
|
||||
|
||||
|
||||
class ShortFrame(Frame):
|
||||
def __init__(self, telegram):
|
||||
super(ShortFrame, self).__init__(0x10, 5, 1, telegram)
|
||||
|
||||
def parse2(self):
|
||||
self.cField = self.telegram[1]
|
||||
self.address = self.telegram[2]
|
||||
|
||||
|
||||
class ControlFrame(Frame):
|
||||
def __init__(self, telegram):
|
||||
super(ControlFrame, self).__init__(0x68, (telegram[1] + 6), 4, telegram)
|
||||
|
||||
def parse2(self):
|
||||
if self.telegram[2] != self.payloadLength:
|
||||
raise InvalidSecondLengthException
|
||||
if self.payloadLength < 3:
|
||||
raise PayloadTooShortException("too short for c, a, ci fields")
|
||||
self.cField = self.telegram[4]
|
||||
self.address = self.telegram[5]
|
||||
self.ciField = self.telegram[6]
|
||||
|
||||
|
||||
class LongFrame(ControlFrame):
|
||||
class FixedDataHeader(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
print("Fixed header length: %d" % len(self.data))
|
||||
|
||||
def parse(self):
|
||||
self.identNr = MeterbusTypeConversion.bcd(self.data[:4])
|
||||
self.manufacturer = self.data[4:6]
|
||||
self.version = self.data[6]
|
||||
self.medium = self.data[7]
|
||||
self.accessNo = self.data[8]
|
||||
self.status = self.data[9]
|
||||
self.signature = self.data[10:]
|
||||
|
||||
def parse2(self):
|
||||
super(LongFrame, self).parse2()
|
||||
if self.payloadLength < 3 + 12:
|
||||
raise PayloadTooShortException("too short for fixed data header")
|
||||
self.fixedDataHeader = LongFrame.FixedDataHeader(self.telegram[7:19])
|
||||
self.fixedDataHeader.parse()
|
||||
|
||||
|
||||
class Telegram(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def fromHexString(self, hexString):
|
||||
self.hexString = hexString
|
||||
octetList = [int(o, 16) for o in self.hexString.split(' ')]
|
||||
self.telegram = bytearray(octetList)
|
||||
|
||||
for i in self.telegram:
|
||||
print(int(i))
|
||||
|
||||
def toHexString(self):
|
||||
return str([hex(b) for b in self.telegram])
|
||||
|
||||
def parse(self):
|
||||
if self.telegram[0] == 0x68 and self.telegram[1] == 0x03:
|
||||
self.frame = ControlFrame(self.telegram)
|
||||
elif self.telegram[0] == 0x68:
|
||||
self.frame = LongFrame(self.telegram)
|
||||
elif self.telegram[0] == 0x10:
|
||||
self.frame = ShortFrame(self.telegram)
|
||||
elif self.telegram[0] == 0xe5:
|
||||
self.frame = SingleCharacter(self.telegram)
|
||||
else:
|
||||
raise InvalidStartCharException()
|
||||
|
||||
self.frame.parse()
|
||||
|
||||
print("Frame is %s" % self.frame)
|
||||
|
||||
|
||||
|
25
MeterbusTypeConversion.py
Normal file
25
MeterbusTypeConversion.py
Normal file
@ -0,0 +1,25 @@
|
||||
'''
|
||||
Created on 11.06.2015
|
||||
|
||||
@author: wn
|
||||
'''
|
||||
|
||||
|
||||
def bcd(data):
|
||||
v = ""
|
||||
for c in data:
|
||||
v += str((c & 0xf0) >> 4)
|
||||
v += str(c & 0x0f)
|
||||
return v
|
||||
|
||||
def manufCode(data):
|
||||
print("data: %s %s" % (hex(data[1]), hex(data[0])))
|
||||
v = data[1] * 256 + data[0]
|
||||
print("v: %s" % hex(v))
|
||||
l3 = chr((v & 0x20) + 64)
|
||||
print("l3: %s" % l3)
|
||||
l2 = chr((v >> 5) & 0x20)
|
||||
print("l2: %s" % l2)
|
||||
l1 = chr((v >> 10) & 0x20)
|
||||
print("l1: %s" % l1)
|
||||
return l1 + l2 + l3
|
161
test1.py
Normal file
161
test1.py
Normal file
@ -0,0 +1,161 @@
|
||||
'''
|
||||
Created on 11.06.2015
|
||||
|
||||
@author: wn
|
||||
'''
|
||||
|
||||
import unittest
|
||||
import MeterbusLib
|
||||
import MeterbusTypeConversion
|
||||
|
||||
|
||||
class TestFrameParsing(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# dishwasher, electric
|
||||
self.inputOkLongFrame_1phase_electric = "68 38 38 68 08 53 72 17 00 13 00 2E 19 24 02 D6 00 00 00 8C 10 04 01 02 00 00 8C 11 04 01 02 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 03 00 02 AC FF 01 01 00 82 40 AC FF 01 FA FF 20 16"
|
||||
|
||||
# electricity
|
||||
self.inputOkLongFrame_3phase_electric = "68 92 92 68 08 50 72 81 14 01 11 2E 19 16 02 88 00 00 00 8C 10 04 58 43 86 00 8C 11 04 58 43 86 00 8C 20 04 00 00 00 00 8C 21 04 00 00 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 5A 00 02 AC FF 01 D2 00 82 40 AC FF 01 00 00 02 FD C9 FF 02 DF 00 02 FD DB FF 02 0F 00 02 AC FF 02 21 00 82 40 AC FF 02 FD FF 02 FD C9 FF 03 E3 00 02 FD DB FF 03 04 00 02 AC FF 03 02 00 82 40 AC FF 03 F4 FF 02 FF 68 00 00 02 AC FF 00 F5 00 82 40 AC FF 00 F1 FF 01 FF 13 00 F4 16"
|
||||
|
||||
# water
|
||||
self.inputOkLongFrame_water = "68 46 46 68 08 30 72 45 71 43 00 24 23 25 07 F6 00 00 00 0C 13 97 95 43 00 8C 10 13 00 00 00 00 0B 3B 00 00 00 0B 26 14 60 01 02 5A AC 00 04 6D 14 0E EB 16 4C 13 96 41 33 00 CC 10 13 00 00 00 00 42 6C DF 1C 42 EC 7E FF 1C 99 16"
|
||||
|
||||
# gas
|
||||
self.inputOkLongFrame_gas = "68 56 56 68 08 40 72 43 60 52 00 77 04 14 03 FF 10 00 00 0C 78 76 03 01 10 0D 7C 08 44 49 20 2E 74 73 75 63 0A 30 30 30 30 30 30 30 30 30 30 04 6D 06 0E EB 16 02 7C 09 65 6D 69 74 20 2E 74 61 62 A3 09 04 13 98 AE 04 00 04 93 7F 4E 01 00 00 44 13 FC A5 04 00 0F 01 00 1F 51 16"
|
||||
|
||||
# Thermometer
|
||||
self.inputOkLongFrame_thermometer = "68 61 61 68 08 21 72 00 00 00 00 00 00 01 00 7F 00 00 00 01 24 02 01 25 30 01 26 10 02 27 07 00 05 67 43 B6 DA 3D 05 67 DC 90 50 BD 05 67 AA E8 AA 41 05 67 AF 57 BA 41 0F 77 C0 00 00 7F 27 00 00 0A 00 00 00 00 00 00 00 01 00 00 00 A2 C3 7F 3F A5 BA 7F 3F 85 A7 7F 3F E7 F9 7F 3F CD CC CC 3D E8 03 00 00 02 16"
|
||||
self.inputNOkLongFrame_thermometer_checksum_failure = "68 61 61 68 08 21 72 00 00 00 00 00 00 01 00 7F 00 00 00 01 24 02 01 25 30 01 26 10 02 27 07 00 05 67 43 B6 DA 3D 05 67 DC 90 50 BD 05 67 AA E8 AA 41 05 67 AF 57 BA 41 0F 77 C0 00 00 7F 27 00 00 0A 00 00 00 00 00 00 00 01 00 00 00 A2 C3 7F 3F A5 BA 7F 3F 85 A7 7F 3F E7 F9 7F 3F CD CC CC 3D E8 03 00 00 01 16"
|
||||
|
||||
# test1
|
||||
self.inputOkSingleCharacter = "e5"
|
||||
|
||||
# test2
|
||||
self.inputOkShortframe = "10 01 02 03 16"
|
||||
self.inputNOkShortframe_checksum_failure = "10 01 02 04 16"
|
||||
|
||||
# test3
|
||||
self.inputOKControlframe = "68 03 03 68 01 02 03 06 16"
|
||||
self.inputNOKControlframe_wrong_stopcode = "68 03 03 68 01 02 03 06 15"
|
||||
self.inputNOKControlframe_wrong_secondlength = "68 03 04 68 01 02 03 06 16"
|
||||
|
||||
self.inputNOk_wrong_startcode = "15 01 02 03 16"
|
||||
|
||||
|
||||
def test_OK_LongFrame_1phase_electric(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkLongFrame_1phase_electric)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
||||
self.assertEqual(telegram.frame.cField, 0x08);
|
||||
self.assertEqual(telegram.frame.address, 0x53);
|
||||
self.assertEqual(telegram.frame.ciField, 0x72);
|
||||
self.assertEqual(telegram.frame.fixedDataHeader.identNr, "17001300")
|
||||
|
||||
def test_OK_LongFrame_3phase_electric(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkLongFrame_3phase_electric)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
||||
self.assertEqual(telegram.frame.cField, 0x08);
|
||||
self.assertEqual(telegram.frame.address, 0x50);
|
||||
self.assertEqual(telegram.frame.ciField, 0x72);
|
||||
self.assertEqual(telegram.frame.fixedDataHeader.identNr, "81140111")
|
||||
|
||||
def test_OK_LongFrame_water(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkLongFrame_water)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
||||
self.assertEqual(telegram.frame.cField, 0x08);
|
||||
self.assertEqual(telegram.frame.address, 0x30);
|
||||
self.assertEqual(telegram.frame.ciField, 0x72);
|
||||
self.assertEqual(telegram.frame.fixedDataHeader.identNr, "45714300")
|
||||
|
||||
def test_OK_LongFrame_gas(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkLongFrame_gas)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
||||
self.assertEqual(telegram.frame.cField, 0x08);
|
||||
self.assertEqual(telegram.frame.address, 0x40);
|
||||
self.assertEqual(telegram.frame.ciField, 0x72);
|
||||
self.assertEqual(telegram.frame.fixedDataHeader.identNr, "43605200")
|
||||
|
||||
def test_OK_LongFrame_thermometer(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkLongFrame_thermometer)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
||||
self.assertEqual(telegram.frame.cField, 0x08);
|
||||
self.assertEqual(telegram.frame.address, 0x21);
|
||||
self.assertEqual(telegram.frame.ciField, 0x72);
|
||||
self.assertEqual(telegram.frame.fixedDataHeader.identNr, "00000000")
|
||||
|
||||
def test_OK_SingleCharacter(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkSingleCharacter)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.SingleCharacter)
|
||||
|
||||
def test_OK_Shortframe(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOkShortframe)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.ShortFrame)
|
||||
self.assertEqual(telegram.frame.cField, 1);
|
||||
self.assertEqual(telegram.frame.address, 2);
|
||||
|
||||
def test_OK_Controlframe(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputOKControlframe)
|
||||
telegram.parse()
|
||||
self.assertIsInstance(telegram.frame, MeterbusLib.ControlFrame)
|
||||
self.assertEqual(telegram.frame.cField, 1);
|
||||
self.assertEqual(telegram.frame.address, 2);
|
||||
self.assertEqual(telegram.frame.ciField, 3);
|
||||
|
||||
def test_NOk_Shortframe_checksum_failure(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputNOkShortframe_checksum_failure)
|
||||
with self.assertRaises(MeterbusLib.InvalidChecksumException):
|
||||
telegram.parse()
|
||||
|
||||
def test_NOk_LongFrame_thermometer_checksum_failure(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputNOkLongFrame_thermometer_checksum_failure)
|
||||
with self.assertRaises(MeterbusLib.InvalidChecksumException):
|
||||
telegram.parse()
|
||||
|
||||
def test_NOk_Controlframe_wrong_stopcode(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputNOKControlframe_wrong_stopcode)
|
||||
with self.assertRaises(MeterbusLib.InvalidStopCharException):
|
||||
telegram.parse()
|
||||
|
||||
def test_NOk_Controlframe_wrong_secondlength(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputNOKControlframe_wrong_secondlength)
|
||||
with self.assertRaises(MeterbusLib.InvalidSecondLengthException):
|
||||
telegram.parse()
|
||||
|
||||
def test_NOk_wrong_startcode(self):
|
||||
telegram = MeterbusLib.Telegram()
|
||||
telegram.fromHexString(self.inputNOk_wrong_startcode)
|
||||
with self.assertRaises(MeterbusLib.InvalidStartCharException):
|
||||
telegram.parse()
|
||||
|
||||
def test_bcd(self):
|
||||
self.assertEqual(MeterbusTypeConversion.bcd([0x99]), '99')
|
||||
self.assertEqual(MeterbusTypeConversion.bcd([0x12, 0x34]), '1234')
|
||||
self.assertEqual(MeterbusTypeConversion.bcd([0x12, 0x34, 0x56, 0x78]), '12345678')
|
||||
|
||||
def test_manufCode(self):
|
||||
self.assertEqual(MeterbusTypeConversion.manufCode([0x77, 0x04]), 'ACW')
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user