This commit is contained in:
@ -2,6 +2,7 @@ package ttn
|
||||
|
||||
import "log"
|
||||
import "fmt"
|
||||
import "time"
|
||||
import "encoding/json"
|
||||
import "udi/config"
|
||||
import "udi/handlers/handler"
|
||||
@ -15,6 +16,10 @@ type TTNHandler struct {
|
||||
dbh *database.DatabaseHandle
|
||||
}
|
||||
|
||||
type DecodedPayloaderHolder struct {
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
type uplinkMessage struct {
|
||||
EndDeviceIds struct {
|
||||
DeviceId string `json:"device_id"`
|
||||
@ -30,7 +35,7 @@ type uplinkMessage struct {
|
||||
FCnt int `json:"f_cnt"`
|
||||
FPort int `json:"f_port"`
|
||||
FrmPayload string `json:"frm_payload"`
|
||||
DecodedPayload map[string]interface{} `json:"decoded_payload"`
|
||||
DecodedPayload DecodedPayloaderHolder `json:"decoded_payload"`
|
||||
RxMetadata []struct {
|
||||
GatewayIds struct {
|
||||
GatewayId string `json:"gateway_id"`
|
||||
@ -62,7 +67,10 @@ type attributes struct {
|
||||
ConsumedAirtime string `json:"consumed_airtime"`
|
||||
}
|
||||
|
||||
|
||||
func (self *DecodedPayloaderHolder) UnmarshalJSON(data []byte) error {
|
||||
self.Payload = data
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTTNHandler(config config.HandlerConfigT) handler.Handler {
|
||||
t := &TTNHandler {
|
||||
@ -82,7 +90,10 @@ func lost(msg string, message handler.MessageT) {
|
||||
}
|
||||
|
||||
func (self *TTNHandler) Handle(message handler.MessageT) {
|
||||
log.Printf("Handler TTN %d processing %s -> %s", self.id, message.Topic, message.Payload)
|
||||
// log.Printf("Handler TTN %d processing %s -> %s", self.id, message.Topic, message.Payload)
|
||||
|
||||
var measurement database.Measurement
|
||||
measurement.Time = time.Now()
|
||||
|
||||
var uplinkMessage uplinkMessage
|
||||
err := json.Unmarshal([]byte(message.Payload), &uplinkMessage)
|
||||
@ -105,17 +116,28 @@ func (self *TTNHandler) Handle(message handler.MessageT) {
|
||||
attributes.Gateways = append(attributes.Gateways, g)
|
||||
}
|
||||
//log.Printf("Attributes: %s", attributes)
|
||||
measurement.Attributes = map[string]interface{} {
|
||||
"DeviceId": attributes.DeviceId,
|
||||
"ApplicationId": attributes.ApplicationId,
|
||||
"FCnt": attributes.FCnt,
|
||||
"FPort": attributes.FPort,
|
||||
"FrmPayload": attributes.FrmPayload,
|
||||
"Gateways": attributes.Gateways,
|
||||
"ConsumedAirtime": attributes.ConsumedAirtime,
|
||||
}
|
||||
|
||||
log.Printf("ApplicationId: %s, DeviceId: %s", attributes.ApplicationId, attributes.DeviceId)
|
||||
//log.Printf("ApplicationId: %s, DeviceId: %s", attributes.ApplicationId, attributes.DeviceId)
|
||||
device, err2 := self.dbh.GetDeviceByLabelAndApplication(attributes.ApplicationId, attributes.DeviceId)
|
||||
if err2 != nil {
|
||||
lost(fmt.Sprintf("Error when loading device: %s, ", err2), message)
|
||||
return
|
||||
}
|
||||
measurement.Application = attributes.ApplicationId
|
||||
measurement.Device = attributes.DeviceId
|
||||
|
||||
log.Printf("DeviceLabel: %s, DeviceType: %s", device.Label, device.DeviceType.ModelIdentifier)
|
||||
//log.Printf("DeviceLabel: %s, DeviceType: %s", device.Label, device.DeviceType.ModelIdentifier)
|
||||
|
||||
var parser func(int, interface{}) (map[string]database.VariableType, error)
|
||||
var parser func(int, []byte) (map[string]database.VariableType, error)
|
||||
switch device.DeviceType.ModelIdentifier {
|
||||
case "emu-prof-ii-lora-cfg1":
|
||||
parser = emuProfIILoRaCfg1.Parse
|
||||
@ -124,12 +146,14 @@ func (self *TTNHandler) Handle(message handler.MessageT) {
|
||||
return
|
||||
}
|
||||
|
||||
_, err3 := parser(uplinkMessage.UplinkMessage.FPort, uplinkMessage.UplinkMessage.DecodedPayload)
|
||||
variables, err3 := parser(uplinkMessage.UplinkMessage.FPort, uplinkMessage.UplinkMessage.DecodedPayload.Payload)
|
||||
if err3 != nil {
|
||||
lost(fmt.Sprintf("Model parser failed: %s", err3), message)
|
||||
return
|
||||
}
|
||||
|
||||
measurement.Values = variables
|
||||
log.Printf("Prepared measurement item: %s", measurement)
|
||||
self.dbh.StoreMeasurement(&measurement)
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user