package ttn import ( "log" "fmt" "time" "encoding/json" "udi/config" "udi/handlers/handler" "udi/handlers/ttn/models/emuProfIILoRaCfg1" "udi/handlers/ttn/models/draginoLdds75" "udi/handlers/ttn/models/draginoLmds200" "udi/database" ) var idSeq int = 0 type TTNHandler struct { id int dbh *database.DatabaseHandle } type DecodedPayloaderHolder struct { Payload []byte } type uplinkMessage struct { EndDeviceIds struct { DeviceId string `json:"device_id"` ApplicationIds struct { ApplicationId string `json:"application_id"` } `json:"application_ids"` DevEui string `json:"dev_eui"` JoinEui string `json:"join_eui"` DevAddr string `json:"dev_addr"` } `json:"end_device_ids"` ReceivedAt string `json:"received_at"` UplinkMessage struct { FCnt int `json:"f_cnt"` FPort int `json:"f_port"` FrmPayload string `json:"frm_payload"` DecodedPayload DecodedPayloaderHolder `json:"decoded_payload"` RxMetadata []struct { GatewayIds struct { GatewayId string `json:"gateway_id"` Eui string `json:"eui"` } `json:"gateway_ids"` Time string `json:"time"` Rssi int `json:"rssi"` ChannelRssi int `json:"channel_rssi"` Snr float32 `json:"snr"` ChannelIndex int `json:"channel_index"` } `json:"rx_metadata"` ConsumedAirtime string `json:"consumed_airtime"` } `json:"uplink_message"` } type gatewayAttributes struct { GatewayId string `json:"gateway_id"` Rssi int `json:"rssi"` Snr float32 `json:"snr"` } type attributes struct { DeviceId string `json:"device_id"` ApplicationId string `json:"application_id"` FCnt int `json:"f_cnt"` FPort int `json:"f_port"` FrmPayload string `json:"frm_payload"` Gateways []gatewayAttributes `json:"gateways"` ConsumedAirtime string `json:"consumed_airtime"` } func (self *DecodedPayloaderHolder) UnmarshalJSON(data []byte) error { self.Payload = data return nil } func NewTTNHandler(config config.HandlerConfigT) handler.Handler { t := &TTNHandler { id: idSeq, } idSeq += 1 t.dbh = database.NewDatabaseHandle() return t } func (self *TTNHandler) GetId() string { return fmt.Sprintf("TTN%d", self.id) } func lost(msg string, message handler.MessageT) { log.Printf("Error: %s, message %s is lost", msg, message) } func (self *TTNHandler) Handle(message handler.MessageT) { // log.Printf("Handler TTN %d processing %s -> %s", self.id, message.Topic, message.Payload) var measurement database.Measurement measurement.Time = time.Now() var uplinkMessage uplinkMessage err := json.Unmarshal([]byte(message.Payload), &uplinkMessage) if err != nil { lost(fmt.Sprintf("Error when unmarshaling message: %s, ", err), message) return } //log.Printf("Parsed message: %s", uplinkMessage) var attributes attributes attributes.DeviceId = uplinkMessage.EndDeviceIds.DeviceId attributes.ApplicationId = uplinkMessage.EndDeviceIds.ApplicationIds.ApplicationId attributes.FCnt = uplinkMessage.UplinkMessage.FCnt attributes.FPort = uplinkMessage.UplinkMessage.FPort attributes.FrmPayload = uplinkMessage.UplinkMessage.FrmPayload attributes.ConsumedAirtime = uplinkMessage.UplinkMessage.ConsumedAirtime for _, rxm := range uplinkMessage.UplinkMessage.RxMetadata { log.Printf("RXM: %s", rxm) g := gatewayAttributes { GatewayId: rxm.GatewayIds.GatewayId, Rssi: rxm.Rssi, Snr: rxm.Snr } attributes.Gateways = append(attributes.Gateways, g) } //log.Printf("Attributes: %s", attributes) measurement.Attributes = map[string]interface{} { "DeviceId": attributes.DeviceId, "ApplicationId": attributes.ApplicationId, "FCnt": attributes.FCnt, "FPort": attributes.FPort, "FrmPayload": attributes.FrmPayload, "Gateways": attributes.Gateways, "ConsumedAirtime": attributes.ConsumedAirtime, } //log.Printf("ApplicationId: %s, DeviceId: %s", attributes.ApplicationId, attributes.DeviceId) device, err2 := self.dbh.GetDeviceByLabelAndApplication(attributes.ApplicationId, attributes.DeviceId) if err2 != nil { lost(fmt.Sprintf("Error when loading device: %s, ", err2), message) return } measurement.Application = attributes.ApplicationId measurement.Device = attributes.DeviceId measurement.Attributes["DeviceType"] = device.DeviceType.ModelIdentifier //log.Printf("DeviceLabel: %s, DeviceType: %s", device.Label, device.DeviceType.ModelIdentifier) var parser func(int, []byte, *map[string]database.VariableType, *database.Device) error switch device.DeviceType.ModelIdentifier { case "emu-prof-ii-lora-cfg1": parser = emuProfIILoRaCfg1.Parse case "dragino-ldds75": parser = draginoLdds75.Parse case "dragino-lmds200": parser = draginoLmds200.Parse default: lost(fmt.Sprintf("No parser found for %s", device.DeviceType.ModelIdentifier), message) return } measurement.Values = make(map[string]database.VariableType) err3 := parser(uplinkMessage.UplinkMessage.FPort, uplinkMessage.UplinkMessage.DecodedPayload.Payload, &(measurement.Values), device) if err3 != nil { lost(fmt.Sprintf("Model parser failed: %s", err3), message) return } log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) }