diff --git a/deployment/instances/udi/default/config.json b/deployment/instances/udi/default/config.json index faa9593..88b51f8 100644 --- a/deployment/instances/udi/default/config.json +++ b/deployment/instances/udi/default/config.json @@ -4,6 +4,36 @@ "tlsEnable": "false" }, "topicMappings": [ + { + "topics": [ "dt1/ai/periodic/1" ], + "handler": "DT1T", + "id": "DT1T.0", + "config": { + "attributes": { + "Application": "Temperature Wago", + "Device": "Freezer", + "HardLow": "-273", + "SoftLow": "-50", + "SoftHigh": "20", + "HardHigh": "100" + } + } + }, + { + "topics": [ "dt1/ai/periodic/3" ], + "handler": "DT1T", + "id": "DT1T.1", + "config": { + "attributes": { + "Application": "Temperature Wago", + "Device": "Outdoor", + "HardLow": "-273", + "SoftLow": "-60", + "SoftHigh": "60", + "HardHigh": "100" + } + } + }, { "topics": [ "IoT/PV/Values" ], "handler": "PV", diff --git a/src/udi/config-test.json b/src/udi/config-test.json index 454fff8..ecc68f5 100644 --- a/src/udi/config-test.json +++ b/src/udi/config-test.json @@ -4,6 +4,24 @@ "tlsEnable": "false" }, "topicMappings": [ + { + "topics": [ "IoT/PV/Values" ], + "handler": "PV", + "id": "PV", + "config": { + "attributes": { + } + } + }, + { + "topics": [ "IoT/MBGW3/Measurement" ], + "handler": "MBGW3", + "id": "MBGW3", + "config": { + "attributes": { + } + } + }, { "topics": [ "dt1/ai/periodic/1" ], "handler": "DT1T", diff --git a/src/udi/counter/counter.go b/src/udi/counter/counter.go new file mode 100644 index 0000000..805ff99 --- /dev/null +++ b/src/udi/counter/counter.go @@ -0,0 +1,94 @@ +package counter + +import ( + "log" + "time" + "encoding/json" +) + +type statsTuple_t struct { + Successful int `json:"good"` + Failed int `json:"bad"` +} + +type stats_t struct { + Received statsTuple_t `json:"received"` + Archived statsTuple_t `json:"archived"` + Dispatched statsTuple_t `json:"dispatched"` + Handled map[string]statsTuple_t `json:"handled"` + Stored statsTuple_t `json:"stored"` +} + +var stats stats_t + +func S(id string) { + switch id { + case "Received": + stats.Received.Successful = stats.Received.Successful + 1 + case "Archived": + stats.Archived.Successful += 1 + case "Dispatched": + stats.Dispatched.Successful += 1 + case "Stored": + stats.Stored.Successful += 1 + default: + log.Printf("Unknown stats id %s", id) + } +} + +func F(id string) { + switch id { + case "Received": + stats.Received.Failed += 1 + case "Archived": + stats.Archived.Failed += 1 + case "Dispatched": + stats.Dispatched.Failed += 1 + case "Stored": + stats.Stored.Failed += 1 + default: + log.Printf("Unknown stats id %s", id) + } +} + +func SH(id string) { + if _, ok := stats.Handled[id]; ok { + tuple := stats.Handled[id] + tuple.Successful += 1 + stats.Handled[id] = tuple + } else { + stats.Handled[id] = statsTuple_t { Successful:1, Failed:0, } + } +} + +func FH(id string) { + if _, ok := stats.Handled[id]; ok { + tuple := stats.Handled[id] + tuple.Failed += 1 + stats.Handled[id] = tuple + } else { + stats.Handled[id] = statsTuple_t { Successful:0, Failed:1, } + } +} + +func InitCounter() { + stats = stats_t { + Received: statsTuple_t {Successful:0,Failed:0,}, + Archived: statsTuple_t {Successful:0,Failed:0,}, + Dispatched: statsTuple_t {Successful:0,Failed:0,}, + Stored: statsTuple_t {Successful:0,Failed:0,}, + Handled: make(map[string]statsTuple_t), + } + + go func() { + for { + sj, err := json.Marshal(stats) + if err != nil { + log.Printf("Unable to marshal stats object: %s", err) + } + log.Println(string(sj)) + time.Sleep(time.Second * 60) + } + }() +} + diff --git a/src/udi/database/database.go b/src/udi/database/database.go index cbdccff..c04f804 100644 --- a/src/udi/database/database.go +++ b/src/udi/database/database.go @@ -5,6 +5,7 @@ import ( "log" //"time" "fmt" + "udi/counter" "gorm.io/driver/postgres" "gorm.io/gorm" ) @@ -24,7 +25,7 @@ func NewDatabaseHandle() *DatabaseHandle { } else { db.dbh = conn db.initialized = true - log.Println("Database connection opened") + //log.Println("Database connection opened") } return &db } @@ -32,16 +33,19 @@ func NewDatabaseHandle() *DatabaseHandle { func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) { if ! self.initialized { log.Printf("Database connection not initialized, can not store, measurement %s lost", measurement) + counter.F("Stored") return } result := self.dbh.Create(measurement) if result.Error != nil { log.Printf("Unable to insert, measurement %s lost, error: %s", measurement, result.Error) + counter.F("Stored") return } - log.Println("Successfully stored measurement") + //log.Println("Successfully stored measurement") + counter.S("Stored") } func (self *DatabaseHandle) GetDeviceByLabelAndApplication(applicationLabel string, deviceLabel string) (*Device, error) { diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index acadb43..25d977d 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -7,6 +7,7 @@ import "fmt" import "net/url" import "udi/mqtt" import "udi/config" +import "udi/counter" import "udi/handlers/handler" import "udi/handlers/ttn" import "udi/handlers/iot" @@ -21,7 +22,7 @@ var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100) func InitDispatcher() { - log.Printf("Initializing dispatcher") + log.Printf("Dispatcher initializing") go archiver() for _, mapping := range config.Config.TopicMappings { @@ -30,40 +31,41 @@ func InitDispatcher() { var factory interface{} switch mapping.Handler { case "TTN": - factory = ttn.NewTTNHandler + factory = ttn.New case "IoT": - factory = iot.NewIoTHandler + factory = iot.New case "PV": - factory = pv.NewPvHandler + factory = pv.New case "MBGW3": - factory = mbgw3.NewMbgw3Handler + factory = mbgw3.New case "SVER": - factory = sver.NewSverHandler + factory = sver.New case "SVEJ": - factory = svej.NewSvejHandler + factory = svej.New case "DT1T": - factory = dt1t.NewDt1tHandler + factory = dt1t.New default: factory = nil log.Printf("No handler %s found, ignore mapping", mapping.Handler) } - fn, ok := factory.(func(config.HandlerConfigT) handler.Handler) + fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler) if ! ok { log.Println("Typ Assertion failed") break } - handler := fn(mapping.Config) + handler := fn(mapping.Id, mapping.Config) handlerMap[mapping.Id] = handler } - log.Printf("handlerMap: %s", handlerMap) + //log.Printf("handlerMap: %s", handlerMap) } func storeMessage(filename string, item handler.MessageT) { file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) + counter.F("Archived") return } defer file.Close() @@ -71,9 +73,11 @@ func storeMessage(filename string, item handler.MessageT) { _, err = file.WriteString(string(archivingString) + "\n") if err != nil { log.Printf("Unable to write message, message is not archived: %s", err) + counter.F("Archived") return } - log.Println("Successfully archived message") + //log.Println("Successfully archived message") + counter.S("Archived") } func archiver() { @@ -89,9 +93,10 @@ func archiver() { err := os.MkdirAll(currentArchivingDir, 0755) if err != nil { log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) + counter.F("Archived") } lastArchivingDir = currentArchivingDir - log.Printf("Archiving dir %s created", currentArchivingDir) + //log.Printf("Archiving dir %s created", currentArchivingDir) } archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) storeMessage(archivingFilename, message) @@ -103,7 +108,7 @@ func InputDispatcher() { for { select { case mqttMessage := <- mqtt.InputChannel: - log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) + //log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } archiverChannel <- message handleMessage(message) @@ -117,16 +122,19 @@ func handleMessage(message handler.MessageT) { for _, subscribedTopic := range mapping.Topics { // log.Printf("Testing %s in %s", message.Topic, subscribedTopic) if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { - log.Printf("Handle message in handler %s", mapping.Id) + //log.Printf("Handle message in handler %s", mapping.Id) handler, exists := handlerMap[mapping.Id] if exists { handler.Handle(message) + counter.S("Dispatched") return } else { log.Printf("Handler %s not found, message %s is lost", mapping.Id, message) + counter.F("Dispatched") } } } } log.Printf("No match for topic %s, message %s is lost", message.Topic, message) + counter.F("Dispatched") } diff --git a/src/udi/handlers/dt1t/dt1t.go b/src/udi/handlers/dt1t/dt1t.go index a578b3b..ba144f9 100644 --- a/src/udi/handlers/dt1t/dt1t.go +++ b/src/udi/handlers/dt1t/dt1t.go @@ -11,20 +11,18 @@ import ( ) -var idSeq int = 0 - type Dt1tHandler struct { - id int + handler.CommonHandler ready bool + label string dbh *database.DatabaseHandle application string device string } -func NewDt1tHandler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &Dt1tHandler { - id: idSeq, } if config.Attributes["Application"] == "" { @@ -37,39 +35,29 @@ func NewDt1tHandler(config config.HandlerConfigT) handler.Handler { return t } t.device = config.Attributes["Device"] + t.Id = id - - idSeq += 1 t.dbh = database.NewDatabaseHandle() t.ready = true return t } -func (self *Dt1tHandler) GetId() string { - return fmt.Sprintf("DT1T%d", self.id) -} - -func lost(msg string, message handler.MessageT) { - log.Printf("Error: %s, message %s is lost", msg, message) -} - func (self *Dt1tHandler) Handle(message handler.MessageT) { if ! self.ready { - log.Println("Handler is not marked as ready, message %s is lost", message) + self.Lost("Handler is not marked as ready", nil, message) return } - log.Printf("Handler DT1T %d processing %s -> %s", self.id, message.Topic, message.Payload) + // log.Printf("Handler DT1T %d processing %s -> %s", self.id, message.Topic, message.Payload) temperature, err := strconv.Atoi(message.Payload) if err != nil { - lost(fmt.Sprintf("Invalid raw value: %s", err), message) + self.Lost("Invalid raw value", err, message) return } if temperature & 0x8000 != 0{ temperature = ((temperature - 1) ^ 0xffff) * -1 } temperatureF := float32(temperature) / 10.0 - log.Printf("TemperatureF: %f", temperatureF) var measurement database.Measurement measurement.Time = time.Now() @@ -84,8 +72,9 @@ func (self *Dt1tHandler) Handle(message handler.MessageT) { measurement.Values = make(map[string]database.VariableType) measurement.Values["Value"] = variable - log.Printf("Prepared measurement item: %s", measurement) + // log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/handlers/handler/handler.go b/src/udi/handlers/handler/handler.go index be52d00..1bec70b 100644 --- a/src/udi/handlers/handler/handler.go +++ b/src/udi/handlers/handler/handler.go @@ -1,6 +1,10 @@ package handler -import "time" +import ( + "time" + "log" + "udi/counter" +) type MessageT struct { Timestamp time.Time @@ -11,5 +15,32 @@ type MessageT struct { type Handler interface { GetId() string Handle(MessageT) + Lost(msg string, err error, message MessageT) + S() + F() +} + +type CommonHandler struct { + Id string +} + +func (self *CommonHandler) S() { + counter.SH(self.Id) +} +func (self *CommonHandler) F() { + counter.FH(self.Id) +} + +func (self *CommonHandler) GetId() string { + return self.Id +} + +func (self *CommonHandler) Lost(msg string, err error, message MessageT) { + if err != nil { + log.Printf("Error: %s, message %s is lost", msg, message) + } else { + log.Printf("Error: %s (%s), message %s is lost", msg, err, message) + } + self.F() } diff --git a/src/udi/handlers/iot/iot.go b/src/udi/handlers/iot/iot.go index e0957c4..b16a740 100644 --- a/src/udi/handlers/iot/iot.go +++ b/src/udi/handlers/iot/iot.go @@ -1,29 +1,23 @@ package iot import "log" -import "fmt" +import "udi/config" import "udi/handlers/handler" -var idSeq int = 0 type IoTHandler struct { - id int + handler.CommonHandler } -func NewIoTHandler() handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &IoTHandler { - id: idSeq, } - idSeq += 1 + t.Id = id return t } -func (self *IoTHandler) GetId() string { - return fmt.Sprintf("IoT%d", self.id) -} - func (self *IoTHandler) Handle(message handler.MessageT) { - log.Printf("Handler IoT %d processing %s -> %s", self.id, message.Topic, message.Payload) + log.Printf("Handler IoT %d processing %s -> %s", self.Id, message.Topic, message.Payload) } diff --git a/src/udi/handlers/mbgw3/mbgw3.go b/src/udi/handlers/mbgw3/mbgw3.go index 6622239..a97cbb3 100644 --- a/src/udi/handlers/mbgw3/mbgw3.go +++ b/src/udi/handlers/mbgw3/mbgw3.go @@ -1,11 +1,8 @@ package mbgw3 import ( - "log" - //"reflect" "time" "strconv" - "fmt" "encoding/json" "udi/config" "udi/handlers/handler" @@ -13,10 +10,8 @@ import ( ) -var idSeq int = 0 - type Mbgw3Handler struct { - id int + handler.CommonHandler dbh *database.DatabaseHandle } @@ -31,26 +26,21 @@ type Observation struct { } -func NewMbgw3Handler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &Mbgw3Handler { - id: idSeq, } - idSeq += 1 + t.Id = id t.dbh = database.NewDatabaseHandle() return t } -func (self *Mbgw3Handler) GetId() string { - return fmt.Sprintf("MBGW3%d", self.id) -} - func (self *Mbgw3Handler) Handle(message handler.MessageT) { -// log.Printf("Handler MBGW3 %d processing %s -> %s", self.id, message.Topic, message.Payload) + //log.Printf("Handler MBGW3 %d processing %s -> %s", self.Id, message.Topic, message.Payload) var observation Observation err := json.Unmarshal([]byte(message.Payload), &observation) if err != nil { - log.Printf("Unable to parse payload into Observation struct, message %s -> %s is lost, error ", message.Topic, message.Payload, err) + self.Lost("Unable to parse payload into Observation struct", err, message) return } @@ -95,9 +85,10 @@ func (self *Mbgw3Handler) Handle(message handler.MessageT) { } } - // log.Printf("Prepared measurement item: %s", measurement) + //log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/handlers/pv/pv.go b/src/udi/handlers/pv/pv.go index 0ca5437..807b03c 100644 --- a/src/udi/handlers/pv/pv.go +++ b/src/udi/handlers/pv/pv.go @@ -1,10 +1,8 @@ package pv import ( - "log" "reflect" "time" - "fmt" "encoding/json" "udi/config" "udi/handlers/handler" @@ -12,10 +10,8 @@ import ( ) -var idSeq int = 0 - type PvHandler struct { - id int + handler.CommonHandler dbh *database.DatabaseHandle } @@ -40,18 +36,14 @@ type PvValue struct { } -func NewPvHandler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &PvHandler { - id: idSeq, } - idSeq += 1 + t.Id = id t.dbh = database.NewDatabaseHandle() return t } -func (self *PvHandler) GetId() string { - return fmt.Sprintf("PV%d", self.id) -} func (self *PvHandler) Handle(message handler.MessageT) { //log.Printf("Handler PV %d processing %s -> %s", self.id, message.Topic, message.Payload) @@ -59,7 +51,7 @@ func (self *PvHandler) Handle(message handler.MessageT) { var pvValue PvValue err := json.Unmarshal([]byte(message.Payload), &pvValue) if err != nil { - log.Printf("Unable to parse payload into pvValue struct, message %s -> %s is lost, error: %s", message.Topic, message.Payload, err) + self.Lost("Unable to parse payload into pvValue struct", err, message) return } @@ -85,6 +77,7 @@ func (self *PvHandler) Handle(message handler.MessageT) { } self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/handlers/svej/svej.go b/src/udi/handlers/svej/svej.go index f9a9115..986d3c5 100644 --- a/src/udi/handlers/svej/svej.go +++ b/src/udi/handlers/svej/svej.go @@ -13,10 +13,8 @@ import ( "udi/database" ) -var idSeq int = 0 - type SingleValueExtractorJsonpathHandler struct { - id int + handler.CommonHandler ready bool application string deviceSelector string @@ -36,12 +34,10 @@ C:ConstantValue */ -func NewSvejHandler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &SingleValueExtractorJsonpathHandler { - id: idSeq, ready: false, } - idSeq += 1 if config.Attributes["application"] == "" { log.Println("Error: application not configured") @@ -77,19 +73,12 @@ func NewSvejHandler(config config.HandlerConfigT) handler.Handler { t.unitJsonpath = jp } + t.Id = id t.ready = true t.dbh = database.NewDatabaseHandle() return t } -func (self *SingleValueExtractorJsonpathHandler) GetId() string { - return fmt.Sprintf("SVE%d", self.id) -} - -func lost(msg string, message handler.MessageT) { - log.Printf("Error: %s, message %s is lost", msg, message) -} - func extractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) { var res string switch selector[:2] { @@ -119,10 +108,10 @@ func extractionHelper(subTopics []string, jPayload interface{}, selector string, func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT) { if ! self.ready { - log.Println("Handler is not marked as ready, message %s is lost", message) + self.Lost("Handler is not marked as ready", nil, message) return } - log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.id, message.Topic, message.Payload) + //log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload) var measurement database.Measurement measurement.Time = time.Now() @@ -133,23 +122,23 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT var jPayload interface{} err := json.Unmarshal([]byte(message.Payload), &jPayload) if err != nil { - lost(fmt.Sprintf("Unable to unmarshal payload: %s", err), message) + self.Lost("Unable to unmarshal payload", err, message) return } device, err1 := extractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath) if err1 != nil { - lost(fmt.Sprintf("Device extraction failed with %s", err1), message) + self.Lost("Device extraction failed", err1, message) return } value, err2 := extractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath) if err2 != nil { - lost(fmt.Sprintf("Value extraction failed with %s", err2), message) + self.Lost("Value extraction failed", err2, message) return } unit, err3 := extractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath) if err3 != nil { - lost(fmt.Sprintf("Unit extraction failed with %s", err3), message) + self.Lost("Unit extraction failed", err3, message) return } @@ -163,7 +152,8 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT measurement.Values = make(map[string]database.VariableType) measurement.Values["Value"] = variable - log.Printf("Prepared measurement item: %s", measurement) + //log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/handlers/sver/sver.go b/src/udi/handlers/sver/sver.go index 026c168..32bbbbe 100644 --- a/src/udi/handlers/sver/sver.go +++ b/src/udi/handlers/sver/sver.go @@ -1,28 +1,22 @@ package sver import ( - "log" "time" "strconv" "strings" "regexp" - "fmt" - "reflect" - "encoding/json" - "github.com/oliveagle/jsonpath" + "log" "udi/config" "udi/handlers/handler" "udi/database" ) -var idSeq int = 0 type SingleValueExtractorRegexHandler struct { - id int + handler.CommonHandler ready bool config localConfig payloadRegex *regexp.Regexp - payloadJsonpath *jsonpath.Compiled dbh *database.DatabaseHandle } @@ -44,12 +38,10 @@ type localConfig struct { } -func NewSverHandler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &SingleValueExtractorRegexHandler { - id: idSeq, ready: false, } - idSeq += 1 var localConfig localConfig if config.Attributes["application"] == "" { @@ -64,18 +56,6 @@ func NewSverHandler(config config.HandlerConfigT) handler.Handler { } else { t.payloadRegex = nil } - payloadJsonpath := config.Attributes["payloadJsonpath"] - if payloadJsonpath != "" { - j, err := jsonpath.Compile(payloadJsonpath) - if err != nil { - log.Printf("Unable to compile jsonpath %s", payloadJsonpath) - t.payloadJsonpath = nil - } else { - t.payloadJsonpath = j - } - } else { - t.payloadJsonpath = nil - } if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL { log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"]) @@ -128,22 +108,15 @@ func NewSverHandler(config config.HandlerConfigT) handler.Handler { t.config = localConfig + t.Id = id t.ready = true t.dbh = database.NewDatabaseHandle() return t } -func (self *SingleValueExtractorRegexHandler) GetId() string { - return fmt.Sprintf("SVE%d", self.id) -} - -func lost(msg string, message handler.MessageT) { - log.Printf("Error: %s, message %s is lost", msg, message) -} - func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { if ! self.ready { - log.Println("Handler is not marked as ready, message %s is lost", message) + self.Lost("Handler is not marked as ready", nil, message) return } //log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload) @@ -160,31 +133,21 @@ func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload) //log.Printf("Matches: %s", strings.Join(payloadMatches, ", ")) } - if self.payloadJsonpath != nil { - var jsonData interface{} - json.Unmarshal([]byte(message.Payload), &jsonData) - p, err := self.payloadJsonpath.Lookup(jsonData) - if err != nil { - lost(fmt.Sprintf("jsonpath error: %s", err), message) - return - } - log.Printf("XXXX: %s", reflect.TypeOf(p)) - } switch self.config.deviceFrom { case TOPIC_SEL: if self.config.devicePart >= len(subTopics) { - lost("devicePart out of range", message) + self.Lost("devicePart out of range", nil, message) return } measurement.Device = subTopics[self.config.devicePart] case PAYLOAD_SEL: - if self.payloadRegex == nil && self.payloadJsonpath == nil { - lost("no payloadRegex or payloadJsonpath defined, devicePart can't be used", message) + if self.payloadRegex == nil { + self.Lost("no payloadRegex defined, devicePart can't be used", nil, message) return } if self.config.devicePart >= len(payloadMatches) { - lost("devicePart out of range", message) + self.Lost("devicePart out of range", nil, message) return } measurement.Device = payloadMatches[self.config.devicePart] @@ -199,12 +162,12 @@ func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { switch self.config.valueFrom { case PAYLOAD_SEL: - if self.payloadRegex == nil && self.payloadJsonpath == nil { - lost("no payloadRegex or payloadJsonpath defined, valuePart can't be used", message) + if self.payloadRegex == nil { + self.Lost("no payloadRegex defined, valuePart can't be used", nil, message) return } if self.config.valuePart >= len(payloadMatches) { - lost("valuePart out of range", message) + self.Lost("valuePart out of range", nil, message) return } variable.Value = payloadMatches[self.config.valuePart] @@ -214,12 +177,12 @@ func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { switch self.config.unitFrom { case PAYLOAD_SEL: - if self.payloadRegex == nil && self.payloadJsonpath == nil { - lost("no payloadRegex or payloadJsonpath defined, unitPart can't be used", message) + if self.payloadRegex == nil { + self.Lost("no payloadRegex defined, unitPart can't be used", nil, message) return } if self.config.unitPart >= len(payloadMatches) { - lost("unitPart out of range", message) + self.Lost("unitPart out of range", nil, message) return } variable.Unit = payloadMatches[self.config.unitPart] @@ -231,5 +194,6 @@ func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { //log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/handlers/ttn/ttn.go b/src/udi/handlers/ttn/ttn.go index 58cf82f..b828690 100644 --- a/src/udi/handlers/ttn/ttn.go +++ b/src/udi/handlers/ttn/ttn.go @@ -1,7 +1,6 @@ package ttn import ( - "log" "fmt" "time" "encoding/json" @@ -13,10 +12,9 @@ import ( "udi/database" ) -var idSeq int = 0 type TTNHandler struct { - id int + handler.CommonHandler dbh *database.DatabaseHandle } @@ -76,23 +74,14 @@ func (self *DecodedPayloaderHolder) UnmarshalJSON(data []byte) error { return nil } -func NewTTNHandler(config config.HandlerConfigT) handler.Handler { +func New(id string, config config.HandlerConfigT) handler.Handler { t := &TTNHandler { - id: idSeq, } - idSeq += 1 + t.Id = id t.dbh = database.NewDatabaseHandle() return t } -func (self *TTNHandler) GetId() string { - return fmt.Sprintf("TTN%d", self.id) -} - -func lost(msg string, message handler.MessageT) { - log.Printf("Error: %s, message %s is lost", msg, message) -} - func (self *TTNHandler) Handle(message handler.MessageT) { // log.Printf("Handler TTN %d processing %s -> %s", self.id, message.Topic, message.Payload) @@ -102,7 +91,7 @@ func (self *TTNHandler) Handle(message handler.MessageT) { var uplinkMessage uplinkMessage err := json.Unmarshal([]byte(message.Payload), &uplinkMessage) if err != nil { - lost(fmt.Sprintf("Error when unmarshaling message: %s, ", err), message) + self.Lost("Error when unmarshaling message", err, message) return } //log.Printf("Parsed message: %s", uplinkMessage) @@ -115,7 +104,7 @@ func (self *TTNHandler) Handle(message handler.MessageT) { attributes.FrmPayload = uplinkMessage.UplinkMessage.FrmPayload attributes.ConsumedAirtime = uplinkMessage.UplinkMessage.ConsumedAirtime for _, rxm := range uplinkMessage.UplinkMessage.RxMetadata { - log.Printf("RXM: %s", rxm) + //log.Printf("RXM: %s", rxm) g := gatewayAttributes { GatewayId: rxm.GatewayIds.GatewayId, Rssi: rxm.Rssi, Snr: rxm.Snr } attributes.Gateways = append(attributes.Gateways, g) } @@ -133,7 +122,7 @@ func (self *TTNHandler) Handle(message handler.MessageT) { //log.Printf("ApplicationId: %s, DeviceId: %s", attributes.ApplicationId, attributes.DeviceId) device, err2 := self.dbh.GetDeviceByLabelAndApplication(attributes.ApplicationId, attributes.DeviceId) if err2 != nil { - lost(fmt.Sprintf("Error when loading device: %s, ", err2), message) + self.Lost("Error when loading device", err2, message) return } measurement.Application = attributes.ApplicationId @@ -151,18 +140,19 @@ func (self *TTNHandler) Handle(message handler.MessageT) { case "dragino-lmds200": parser = draginoLmds200.Parse default: - lost(fmt.Sprintf("No parser found for %s", device.DeviceType.ModelIdentifier), message) + self.Lost(fmt.Sprintf("No parser found for %s", device.DeviceType.ModelIdentifier), nil, message) return } measurement.Values = make(map[string]database.VariableType) err3 := parser(uplinkMessage.UplinkMessage.FPort, uplinkMessage.UplinkMessage.DecodedPayload.Payload, &(measurement.Values), device) if err3 != nil { - lost(fmt.Sprintf("Model parser failed: %s", err3), message) + self.Lost("Model parser failed", err3, message) return } - log.Printf("Prepared measurement item: %s", measurement) + //log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) + self.S() } diff --git a/src/udi/main.go b/src/udi/main.go index 3719efb..e7a2760 100644 --- a/src/udi/main.go +++ b/src/udi/main.go @@ -5,6 +5,7 @@ import "os" import "os/signal" import "udi/mqtt" import "udi/config" +import "udi/counter" import "udi/dispatcher" @@ -23,6 +24,8 @@ func main() { mqtt.StartMqttClient() defer mqtt.StopMqttClient() + counter.InitCounter() + log.Println("UDI running") c := make(chan os.Signal, 1) diff --git a/src/udi/mqtt/mqtt.go b/src/udi/mqtt/mqtt.go index 320dcb4..15fa2f2 100644 --- a/src/udi/mqtt/mqtt.go +++ b/src/udi/mqtt/mqtt.go @@ -7,6 +7,7 @@ import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" import "crypto/tls" import "udi/config" +import "udi/counter" type Message struct { Topic string @@ -26,10 +27,12 @@ func onMessageReceived(client MQTT.Client, message MQTT.Message) { } select { case InputChannel <- m: + counter.S("Received") {} //log.Println("Message sent to channel") default: log.Println("Channel full, message lost") + counter.F("Received") } } @@ -54,7 +57,7 @@ func onConnect(client MQTT.Client) { if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) } - log.Printf("Successfully subscribed to topic %s", topic) + log.Printf("Topic %s subscribed", topic) } } @@ -101,19 +104,19 @@ func StartMqttClient() { enableTls := config.Config.Mqtt.TlsEnable if enableTls == "true" { - log.Println("Enabling TLS connection") + //log.Println("Enabling TLS connection") tlsConfig := &tls.Config { InsecureSkipVerify: true, } opts.SetTLSConfig(tlsConfig) } - log.Println("Trying to connect to broker") + log.Println("Broker connecting") mqttClient = MQTT.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) } - log.Printf("Successfully connected to broker %s", broker) + //log.Printf("Successfully connected to broker %s", broker) go outputDispatcher(mqttClient)