''' Created on 11.06.2015 @author: wn ''' import MeterbusTypeConversion from MeterbusLibExceptions import * class Frame(object): def __init__(self, startCharacter, frameLength, firstPayload, telegram): self.telegram = telegram self.frameName = self.__class__.__name__ self.frameLength = frameLength self.payloadLength = self.frameLength - 6 self.frameStartCharacter = startCharacter self.firstPayload = firstPayload def parse(self): if self.telegram[0] != self.frameStartCharacter: raise InvalidStartCharException() if len(self.telegram) != self.frameLength: raise InvalidLengthException() self.verifyChecksumAndEnd() self.parse2() def verifyChecksumAndEnd(self): if (self.firstPayload != 0): checksum = sum(self.telegram[self.firstPayload:-2]) & 0xff if checksum != self.telegram[-2]: raise InvalidChecksumException() if self.telegram[-1] != 0x16: raise InvalidStopCharException() def parse2(self): raise NotImplementedError def getJSON(self): raise NotImplementedError class SingleCharacter(Frame): def __init__(self, telegram): super(SingleCharacter, self).__init__(0xe5, 1, 0, telegram) def parse2(self): pass class ShortFrame(Frame): def __init__(self, telegram): super(ShortFrame, self).__init__(0x10, 5, 1, telegram) def parse2(self): self.cField = self.telegram[1] self.address = self.telegram[2] class ControlFrame(Frame): def __init__(self, telegram): super(ControlFrame, self).__init__(0x68, (telegram[1] + 6), 4, telegram) def parse2(self): if self.telegram[2] != self.payloadLength: raise InvalidSecondLengthException if self.payloadLength < 3: raise PayloadTooShortException("too short for c, a, ci fields") self.cField = self.telegram[4] self.address = self.telegram[5] self.ciField = self.telegram[6] class LongFrame(ControlFrame): class FixedDataHeader(object): def __init__(self, data): self.data = data def parse(self): self.identNr = MeterbusTypeConversion.bcd(self.data[:4]) self.manufacturer = MeterbusTypeConversion.manufCode(self.data[4:6]) self.version = self.data[6] self.medium = MeterbusTypeConversion.mediumCode(self.data[7]) self.accessNo = self.data[8] self.status = self.data[9] self.signature = self.data[10:] if self.signature[0] != 0 or self.signature[1] != 0: raise InvalidFrameException("signature should be zero") class DataInformationElement(object): def __init__(self, data): self.data = data self.consumed = 0 @classmethod def create(cls, data): if data[0] != 0x0f: return LongFrame.DataInformationBlock(data) else: return LongFrame.ManufacturerSpecificDataBlock(data) class ManufacturerSpecificDataBlock(DataInformationElement): def parse(self): self.dif = self.data[0] self.mdh = self.data[1:] def getJSON(self): j = {'dif': self.dif, 'mdh': self.mdh} return j class DataInformationBlock(DataInformationElement): VALUE_LENGTH_MAP = [0, 1, 2, 3, 4, 4, 6, 8, 0, 1, 2, 3, 4, -1, 6, 0 ] def parse(self): self.dif = self.data[self.consumed] self.consumed += 1 self.dife = [] if self.dif & 0x80 != 0: while True: curDife = self.data[self.consumed] self.consumed += 1 self.dife.append(curDife) if curDife & 0x80 == 0: break self.vif = self.data[self.consumed] self.consumed += 1 self.vife = [] if self.vif & 0x80 != 0: while True: curVife = self.data[self.consumed] self.consumed += 1 self.vife.append(curVife) if curVife & 0x80 == 0: break self.vifData = [] if self.vif & 0x7f == 0x7c: dataLength = self.data[self.consumed] self.consumed += 1 self.vifData = [v for v in self.data[self.consumed:self.consumed+dataLength]] self.consumed += dataLength self.userData = [u for u in self.data[self.consumed:self.consumed+LongFrame.DataInformationBlock.VALUE_LENGTH_MAP[self.dif&0x0f]]] return self.consumed def value(self): res = 0 difCode = self.dif & 0x0f if difCode == 0: # 0 pass elif difCode == 1: # 8bit int res = self.userData[0] elif difCode == 2: # 16bit int res = (self.userData[1] << 8) + self.userData[0] elif difCode == 3: # 24bit int res = (self.userData[2] << 16) + (self.userData[1] << 8) + self.userData[0] elif difCode == 4: # 32bit int res = (self.userData[3] << 24) + (self.userData[2] << 16) + (self.userData[1] << 8) + self.userData[0] elif difCode == 5: # 32bit float res = 1.0 elif difCode == 6: # 48bit int res = (self.userData[5] << 40) + (self.userData[4] << 32) + (self.userData[3] << 24) + (self.userData[2] << 16) + (self.userData[1] << 8) + self.userData[0] elif difCode == 7: # 64bit int res = (self.userData[7] << 56) + (self.userData[6] << 48) + (self.userData[5] << 40) + (self.userData[4] << 32) + (self.userData[3] << 24) + (self.userData[2] << 16) + (self.userData[1] << 8) + self.userData[0] elif difCode == 8: # strange res = 0 elif difCode in [9, 10, 11, 12, 14]: res = MeterbusTypeConversion.bcd(self.userData) return res def getJSON(self): j = {'dif': self.dif, 'dife': self.dife, 'vif': self.vif, 'vife': self.vife, 'vifData': self.vifData, 'userData':self.userData, 'value':self.value()} return j def parse2(self): super(LongFrame, self).parse2() if self.payloadLength < 3 + 12: raise PayloadTooShortException("too short for fixed data header") self.fixedDataHeader = LongFrame.FixedDataHeader(self.telegram[7:19]) self.fixedDataHeader.parse() self.dib = [] consumed = 0 #print("Telegram length: %d" % len(self.telegram)) while True: curDib = LongFrame.DataInformationElement.create(self.telegram[(19 + consumed):]) consumed += curDib.parse() self.dib.append(curDib) #print("PayloadLength: %d, Consumed: %d" % (self.payloadLength, consumed)) print("DIB: %s" % str(curDib.getJSON())) if (consumed + 12 + 3 - 1) >= self.payloadLength: break class Telegram(object): def __init__(self): pass def fromHexString(self, hexString): self.hexString = hexString octetList = [int(o, 16) for o in self.hexString.split(' ')] self.telegram = bytearray(octetList) def toHexString(self): return str([hex(b) for b in self.telegram]) def parse(self): if self.telegram[0] == 0x68 and self.telegram[1] == 0x03: self.frame = ControlFrame(self.telegram) elif self.telegram[0] == 0x68: self.frame = LongFrame(self.telegram) elif self.telegram[0] == 0x10: self.frame = ShortFrame(self.telegram) elif self.telegram[0] == 0xe5: self.frame = SingleCharacter(self.telegram) else: raise InvalidStartCharException() self.frame.parse()