230 lines
13 KiB
Python
230 lines
13 KiB
Python
'''
|
|
Created on 11.06.2015
|
|
|
|
@author: wn
|
|
'''
|
|
|
|
import unittest
|
|
import MeterbusLib
|
|
import MeterbusTypeConversion
|
|
import MeterbusLibExceptions
|
|
|
|
|
|
class TestCaseWFloat(unittest.TestCase):
|
|
def assertBetween(self, v, bottom, top):
|
|
self.assertTrue(v > bottom and v < top )
|
|
|
|
class TestFrameParsing(TestCaseWFloat):
|
|
def setUp(self):
|
|
#devices
|
|
self.devices = []
|
|
device_dishwasher_electric = MeterbusLib.Device(0x53, "Dishwasher", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
|
|
self.devices.append(device_dishwasher_electric)
|
|
device_3phase_electric = MeterbusLib.Device(0x50, "3 Phase Electric", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial",
|
|
"Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1",
|
|
"Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2",
|
|
"Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3",
|
|
"converter ratio", "Power total", "img. Power total", "tariff"
|
|
])
|
|
self.devices.append(device_3phase_electric)
|
|
|
|
self.brokenDevices = []
|
|
device_dishwasher_electric = MeterbusLib.Device(0x53, "Dishwasher", ["Energy total", "Energy partial", "Voltage", "Current", "Power"])
|
|
self.brokenDevices.append(device_dishwasher_electric)
|
|
|
|
|
|
# dishwasher, electric
|
|
self.inputOkLongFrame_1phase_electric = "68 38 38 68 08 53 72 17 00 13 00 2E 19 24 02 D6 00 00 00 8C 10 04 01 02 00 00 8C 11 04 01 02 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 03 00 02 AC FF 01 01 00 82 40 AC FF 01 FA FF 20 16"
|
|
self.inputNOkLongFrame_1phase_electric_wrong_medium = "68 38 38 68 08 53 72 17 00 13 00 2E 19 24 FE D6 00 00 00 8C 10 04 01 02 00 00 8C 11 04 01 02 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 03 00 02 AC FF 01 01 00 82 40 AC FF 01 FA FF 1c 16"
|
|
self.inputNOkLongFrame_1phase_electric_wrong_signature = "68 38 38 68 08 53 72 17 00 13 00 2E 19 24 02 D6 00 00 01 8C 10 04 01 02 00 00 8C 11 04 01 02 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 03 00 02 AC FF 01 01 00 82 40 AC FF 01 FA FF 21 16"
|
|
|
|
# electricity
|
|
self.inputOkLongFrame_3phase_electric = "68 92 92 68 08 50 72 81 14 01 11 2E 19 16 02 88 00 00 00 8C 10 04 58 43 86 00 8C 11 04 58 43 86 00 8C 20 04 00 00 00 00 8C 21 04 00 00 00 00 02 FD C9 FF 01 E4 00 02 FD DB FF 01 5A 00 02 AC FF 01 D2 00 82 40 AC FF 01 00 00 02 FD C9 FF 02 DF 00 02 FD DB FF 02 0F 00 02 AC FF 02 21 00 82 40 AC FF 02 FD FF 02 FD C9 FF 03 E3 00 02 FD DB FF 03 04 00 02 AC FF 03 02 00 82 40 AC FF 03 F4 FF 02 FF 68 00 00 02 AC FF 00 F5 00 82 40 AC FF 00 F1 FF 01 FF 13 00 F4 16"
|
|
|
|
# water
|
|
self.inputOkLongFrame_water = "68 46 46 68 08 30 72 45 71 43 00 24 23 25 07 F6 00 00 00 0C 13 97 95 43 00 8C 10 13 00 00 00 00 0B 3B 00 00 00 0B 26 14 60 01 02 5A AC 00 04 6D 14 0E EB 16 4C 13 96 41 33 00 CC 10 13 00 00 00 00 42 6C DF 1C 42 EC 7E FF 1C 99 16"
|
|
|
|
# gas
|
|
self.inputOkLongFrame_gas = "68 56 56 68 08 40 72 43 60 52 00 77 04 14 03 FF 10 00 00 0C 78 76 03 01 10 0D 7C 08 44 49 20 2E 74 73 75 63 0A 30 30 30 30 30 30 30 30 30 30 04 6D 06 0E EB 16 02 7C 09 65 6D 69 74 20 2E 74 61 62 A3 09 04 13 98 AE 04 00 04 93 7F 4E 01 00 00 44 13 FC A5 04 00 0F 01 00 1F 51 16"
|
|
|
|
# Thermometer
|
|
self.inputOkLongFrame_thermometer = "68 61 61 68 08 21 72 00 00 00 00 00 00 01 00 7F 00 00 00 01 24 02 01 25 30 01 26 10 02 27 07 00 05 67 43 B6 DA 3D 05 67 DC 90 50 BD 05 67 AA E8 AA 41 05 67 AF 57 BA 41 0F 77 C0 00 00 7F 27 00 00 0A 00 00 00 00 00 00 00 01 00 00 00 A2 C3 7F 3F A5 BA 7F 3F 85 A7 7F 3F E7 F9 7F 3F CD CC CC 3D E8 03 00 00 02 16"
|
|
self.inputNOkLongFrame_thermometer_checksum_failure = "68 61 61 68 08 21 72 00 00 00 00 00 00 01 00 7F 00 00 00 01 24 02 01 25 30 01 26 10 02 27 07 00 05 67 43 B6 DA 3D 05 67 DC 90 50 BD 05 67 AA E8 AA 41 05 67 AF 57 BA 41 0F 77 C0 00 00 7F 27 00 00 0A 00 00 00 00 00 00 00 01 00 00 00 A2 C3 7F 3F A5 BA 7F 3F 85 A7 7F 3F E7 F9 7F 3F CD CC CC 3D E8 03 00 00 01 16"
|
|
|
|
# test1
|
|
self.inputOkSingleCharacter = "e5"
|
|
|
|
# test2
|
|
self.inputOkShortframe = "10 01 02 03 16"
|
|
self.inputNOkShortframe_checksum_failure = "10 01 02 04 16"
|
|
|
|
# test3
|
|
self.inputOKControlframe = "68 03 03 68 01 02 03 06 16"
|
|
self.inputNOKControlframe_wrong_stopcode = "68 03 03 68 01 02 03 06 15"
|
|
self.inputNOKControlframe_wrong_secondlength = "68 03 04 68 01 02 03 06 16"
|
|
|
|
self.inputNOk_wrong_startcode = "15 01 02 03 16"
|
|
|
|
|
|
def test_OK_LongFrame_1phase_electric(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkLongFrame_1phase_electric)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
|
self.assertEqual(telegram.frame.cField, 0x08);
|
|
self.assertEqual(telegram.frame.address, 0x53);
|
|
self.assertEqual(telegram.frame.ciField, 0x72);
|
|
self.assertEqual(telegram.frame.fixedDataHeader.identNr, 130017)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.manufacturer, "FIN")
|
|
self.assertEqual(telegram.frame.fixedDataHeader.version, 36)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.medium, "Electrity")
|
|
self.assertEqual(telegram.frame.comment, "Dishwasher")
|
|
self.assertEqual(telegram.frame.dib[0].comment, "Energy total")
|
|
self.assertEqual(telegram.frame.dib[0].rawValue, 201)
|
|
self.assertEqual(telegram.frame.dib[0].value, 2010)
|
|
self.assertEqual(telegram.frame.dib[3].comment, "Current")
|
|
self.assertEqual(telegram.frame.dib[3].rawValue, 3)
|
|
v = telegram.frame.dib[3].value
|
|
self.assertBetween(v, 0.2999, 0.3001)
|
|
|
|
def test_OK_LongFrame_1phase_electric_wrong_medium(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOkLongFrame_1phase_electric_wrong_medium)
|
|
with self.assertRaises(MeterbusLibExceptions.MediumConversionException):
|
|
telegram.parse()
|
|
|
|
def test_OK_LongFrame_1phase_electric_wrong_signature(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOkLongFrame_1phase_electric_wrong_signature)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidFrameException):
|
|
telegram.parse()
|
|
|
|
def test_OK_LongFrame_1phase_electric_device_items_wrong(self):
|
|
telegram = MeterbusLib.Telegram(self.brokenDevices)
|
|
telegram.fromHexString(self.inputOkLongFrame_1phase_electric)
|
|
with self.assertRaises(MeterbusLibExceptions.DeviceItemsMismatchException):
|
|
telegram.parse()
|
|
|
|
|
|
def test_OK_LongFrame_3phase_electric(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkLongFrame_3phase_electric)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
|
self.assertEqual(telegram.frame.cField, 0x08);
|
|
self.assertEqual(telegram.frame.address, 0x50);
|
|
self.assertEqual(telegram.frame.ciField, 0x72);
|
|
self.assertEqual(telegram.frame.fixedDataHeader.identNr, 11011481)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.manufacturer, "FIN")
|
|
self.assertEqual(telegram.frame.fixedDataHeader.version, 22)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.medium, "Electrity")
|
|
self.assertEqual(telegram.frame.comment, "3 Phase Electric")
|
|
|
|
|
|
def test_OK_LongFrame_water(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkLongFrame_water)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
|
self.assertEqual(telegram.frame.cField, 0x08);
|
|
self.assertEqual(telegram.frame.address, 0x30);
|
|
self.assertEqual(telegram.frame.ciField, 0x72);
|
|
self.assertEqual(telegram.frame.fixedDataHeader.identNr, 437145)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.manufacturer, "HYD")
|
|
self.assertEqual(telegram.frame.fixedDataHeader.version, 37)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.medium, "Water")
|
|
|
|
|
|
def test_OK_LongFrame_gas(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkLongFrame_gas)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
|
self.assertEqual(telegram.frame.cField, 0x08);
|
|
self.assertEqual(telegram.frame.address, 0x40);
|
|
self.assertEqual(telegram.frame.ciField, 0x72);
|
|
self.assertEqual(telegram.frame.fixedDataHeader.identNr, 526043)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.manufacturer, "ACW")
|
|
self.assertEqual(telegram.frame.fixedDataHeader.version, 20)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.medium, "Gas")
|
|
|
|
def test_OK_LongFrame_thermometer(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkLongFrame_thermometer)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.LongFrame)
|
|
self.assertEqual(telegram.frame.cField, 0x08);
|
|
self.assertEqual(telegram.frame.address, 0x21);
|
|
self.assertEqual(telegram.frame.ciField, 0x72);
|
|
self.assertEqual(telegram.frame.fixedDataHeader.identNr, 0)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.manufacturer, "@@@")
|
|
self.assertEqual(telegram.frame.fixedDataHeader.version, 1)
|
|
self.assertEqual(telegram.frame.fixedDataHeader.medium, "Other")
|
|
|
|
def test_OK_SingleCharacter(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkSingleCharacter)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.SingleCharacter)
|
|
|
|
def test_OK_Shortframe(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOkShortframe)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.ShortFrame)
|
|
self.assertEqual(telegram.frame.cField, 1);
|
|
self.assertEqual(telegram.frame.address, 2);
|
|
|
|
def test_OK_Controlframe(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputOKControlframe)
|
|
telegram.parse()
|
|
self.assertIsInstance(telegram.frame, MeterbusLib.ControlFrame)
|
|
self.assertEqual(telegram.frame.cField, 1);
|
|
self.assertEqual(telegram.frame.address, 2);
|
|
self.assertEqual(telegram.frame.ciField, 3);
|
|
|
|
def test_NOk_Shortframe_checksum_failure(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOkShortframe_checksum_failure)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidChecksumException):
|
|
telegram.parse()
|
|
|
|
def test_NOk_LongFrame_thermometer_checksum_failure(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOkLongFrame_thermometer_checksum_failure)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidChecksumException):
|
|
telegram.parse()
|
|
|
|
def test_NOk_Controlframe_wrong_stopcode(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOKControlframe_wrong_stopcode)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidStopCharException):
|
|
telegram.parse()
|
|
|
|
def test_NOk_Controlframe_wrong_secondlength(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOKControlframe_wrong_secondlength)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidSecondLengthException):
|
|
telegram.parse()
|
|
|
|
def test_NOk_wrong_startcode(self):
|
|
telegram = MeterbusLib.Telegram(self.devices)
|
|
telegram.fromHexString(self.inputNOk_wrong_startcode)
|
|
with self.assertRaises(MeterbusLibExceptions.InvalidStartCharException):
|
|
telegram.parse()
|
|
|
|
def test_bcd(self):
|
|
self.assertEqual(MeterbusTypeConversion.bcd([0x99]), 99)
|
|
self.assertEqual(MeterbusTypeConversion.bcd([0x12, 0x34]), 3412)
|
|
self.assertEqual(MeterbusTypeConversion.bcd([0x12, 0x34, 0x56, 0x78]), 78563412)
|
|
|
|
def test_manufCode(self):
|
|
self.assertEqual(MeterbusTypeConversion.manufCode([0x77, 0x04]), 'ACW')
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|