dt1t and counter and refactoring using embedded interfaces
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful

This commit is contained in:
2023-12-21 13:05:00 +01:00
parent 99d678b4b1
commit 8e6bea3f19
15 changed files with 276 additions and 174 deletions

View File

@ -7,6 +7,7 @@ import "fmt"
import "net/url"
import "udi/mqtt"
import "udi/config"
import "udi/counter"
import "udi/handlers/handler"
import "udi/handlers/ttn"
import "udi/handlers/iot"
@ -21,7 +22,7 @@ var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
func InitDispatcher() {
log.Printf("Initializing dispatcher")
log.Printf("Dispatcher initializing")
go archiver()
for _, mapping := range config.Config.TopicMappings {
@ -30,40 +31,41 @@ func InitDispatcher() {
var factory interface{}
switch mapping.Handler {
case "TTN":
factory = ttn.NewTTNHandler
factory = ttn.New
case "IoT":
factory = iot.NewIoTHandler
factory = iot.New
case "PV":
factory = pv.NewPvHandler
factory = pv.New
case "MBGW3":
factory = mbgw3.NewMbgw3Handler
factory = mbgw3.New
case "SVER":
factory = sver.NewSverHandler
factory = sver.New
case "SVEJ":
factory = svej.NewSvejHandler
factory = svej.New
case "DT1T":
factory = dt1t.NewDt1tHandler
factory = dt1t.New
default:
factory = nil
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
}
fn, ok := factory.(func(config.HandlerConfigT) handler.Handler)
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
if ! ok {
log.Println("Typ Assertion failed")
break
}
handler := fn(mapping.Config)
handler := fn(mapping.Id, mapping.Config)
handlerMap[mapping.Id] = handler
}
log.Printf("handlerMap: %s", handlerMap)
//log.Printf("handlerMap: %s", handlerMap)
}
func storeMessage(filename string, item handler.MessageT) {
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644)
if err != nil {
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
counter.F("Archived")
return
}
defer file.Close()
@ -71,9 +73,11 @@ func storeMessage(filename string, item handler.MessageT) {
_, err = file.WriteString(string(archivingString) + "\n")
if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err)
counter.F("Archived")
return
}
log.Println("Successfully archived message")
//log.Println("Successfully archived message")
counter.S("Archived")
}
func archiver() {
@ -89,9 +93,10 @@ func archiver() {
err := os.MkdirAll(currentArchivingDir, 0755)
if err != nil {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
counter.F("Archived")
}
lastArchivingDir = currentArchivingDir
log.Printf("Archiving dir %s created", currentArchivingDir)
//log.Printf("Archiving dir %s created", currentArchivingDir)
}
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
storeMessage(archivingFilename, message)
@ -103,7 +108,7 @@ func InputDispatcher() {
for {
select {
case mqttMessage := <- mqtt.InputChannel:
log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
archiverChannel <- message
handleMessage(message)
@ -117,16 +122,19 @@ func handleMessage(message handler.MessageT) {
for _, subscribedTopic := range mapping.Topics {
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
log.Printf("Handle message in handler %s", mapping.Id)
//log.Printf("Handle message in handler %s", mapping.Id)
handler, exists := handlerMap[mapping.Id]
if exists {
handler.Handle(message)
counter.S("Dispatched")
return
} else {
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
counter.F("Dispatched")
}
}
}
}
log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
counter.F("Dispatched")
}