diff --git a/.woodpecker.yml b/.woodpecker.yml index 8624b0f..edf6f8e 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -51,12 +51,12 @@ steps: settings: repo: ${FORGE_NAME}/${CI_REPO} registry: - from_secret: container_registry + from_secret: local_registry tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG} username: - from_secret: container_registry_username + from_secret: local_username password: - from_secret: container_registry_password + from_secret: local_password dockerfile: Dockerfile when: - event: [push, tag] diff --git a/deployment/instances/udi/default/config.json b/deployment/instances/udi/default/config.json index d824512..ef17a74 100644 --- a/deployment/instances/udi/default/config.json +++ b/deployment/instances/udi/default/config.json @@ -62,6 +62,16 @@ } } }, + { + "topics": [ "IoT/Car/Values" ], + "handler": "Car", + "id": "Car", + "config": { + "databaseConnStr": "", + "attributes": { + } + } + }, { "topics": [ "locative/event/#" ], "handler": "Locative", diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index 9db136a..c0a8caf 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -1,149 +1,153 @@ package dispatcher -import "log" -import "time" -import "os" -import "fmt" -import "net/url" -import "udi/mqtt" -import "udi/config" -import "udi/counter" -import "udi/handlers/handler" -import "udi/handlers/ttn" -import "udi/handlers/iot" -import "udi/handlers/pv" -import "udi/handlers/mbgw3" -import "udi/handlers/sver" -import "udi/handlers/svej" -import "udi/handlers/dt1t" -import "udi/handlers/locative" -import "udi/handlers/prepared" -import "udi/handlers/z2m" - +import ( + "fmt" + "log" + "net/url" + "os" + "time" + "udi/config" + "udi/counter" + "udi/handlers/car" + "udi/handlers/dt1t" + "udi/handlers/handler" + "udi/handlers/iot" + "udi/handlers/locative" + "udi/handlers/mbgw3" + "udi/handlers/prepared" + "udi/handlers/pv" + "udi/handlers/svej" + "udi/handlers/sver" + "udi/handlers/ttn" + "udi/handlers/z2m" + "udi/mqtt" +) var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100) func InitDispatcher() { - log.Printf("Dispatcher initializing") - go archiver() + log.Printf("Dispatcher initializing") + go archiver() - for _, mapping := range config.Config.TopicMappings { - // log.Printf("Trying to initialize %s", mapping) + for _, mapping := range config.Config.TopicMappings { + // log.Printf("Trying to initialize %s", mapping) - var factory interface{} - switch mapping.Handler { - case "TTN": - factory = ttn.New - case "IoT": - factory = iot.New - case "PV": - factory = pv.New - case "MBGW3": - factory = mbgw3.New - case "SVER": - factory = sver.New - case "SVEJ": - factory = svej.New - case "DT1T": - factory = dt1t.New - case "Locative": - factory = locative.New - case "PREP": - factory = prepared.New - case "Z2M": - factory = z2m.New - default: - factory = nil - log.Printf("No handler %s found, ignore mapping", mapping.Handler) - } + var factory interface{} + switch mapping.Handler { + case "TTN": + factory = ttn.New + case "IoT": + factory = iot.New + case "PV": + factory = pv.New + case "MBGW3": + factory = mbgw3.New + case "SVER": + factory = sver.New + case "SVEJ": + factory = svej.New + case "DT1T": + factory = dt1t.New + case "Locative": + factory = locative.New + case "PREP": + factory = prepared.New + case "Z2M": + factory = z2m.New + case "Car": + factory = car.New + default: + factory = nil + log.Printf("No handler %s found, ignore mapping", mapping.Handler) + } - fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler) - if ! ok { - log.Println("Typ Assertion failed") - break - } - handler := fn(mapping.Id, mapping.Config) - handlerMap[mapping.Id] = handler - } + fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler) + if !ok { + log.Println("Typ Assertion failed") + break + } + handler := fn(mapping.Id, mapping.Config) + handlerMap[mapping.Id] = handler + } - //log.Printf("handlerMap: %s", handlerMap) + //log.Printf("handlerMap: %s", handlerMap) } func storeMessage(filename string, item handler.MessageT) { - file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) - if err != nil { - log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) - counter.F("Archived") - return - } - defer file.Close() - archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload) - _, err = file.WriteString(string(archivingString) + "\n") - if err != nil { - log.Printf("Unable to write message, message is not archived: %s", err) - counter.F("Archived") - return - } - //log.Println("Successfully archived message") - counter.S("Archived") + file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) + counter.F("Archived") + return + } + defer file.Close() + archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload) + _, err = file.WriteString(string(archivingString) + "\n") + if err != nil { + log.Printf("Unable to write message, message is not archived: %s", err) + counter.F("Archived") + return + } + //log.Println("Successfully archived message") + counter.S("Archived") } func archiver() { - archivingRootDir := config.Config.Archiver.Dir - var lastArchivingDir string + archivingRootDir := config.Config.Archiver.Dir + var lastArchivingDir string - for { - select { - case message := <- archiverChannel: - currentDateStr := message.Timestamp.Format("2006/01/02/15") - currentArchivingDir := archivingRootDir + "/" + currentDateStr - if currentArchivingDir != lastArchivingDir { - err := os.MkdirAll(currentArchivingDir, 0755) - if err != nil { - log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) - counter.F("Archived") - } - lastArchivingDir = currentArchivingDir - //log.Printf("Archiving dir %s created", currentArchivingDir) - } - archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) - storeMessage(archivingFilename, message) - } - } + for { + select { + case message := <-archiverChannel: + currentDateStr := message.Timestamp.Format("2006/01/02/15") + currentArchivingDir := archivingRootDir + "/" + currentDateStr + if currentArchivingDir != lastArchivingDir { + err := os.MkdirAll(currentArchivingDir, 0755) + if err != nil { + log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) + counter.F("Archived") + } + lastArchivingDir = currentArchivingDir + //log.Printf("Archiving dir %s created", currentArchivingDir) + } + archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) + storeMessage(archivingFilename, message) + } + } } func InputDispatcher() { - for { - select { - case mqttMessage := <- mqtt.InputChannel: - //log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) - message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } - archiverChannel <- message - handleMessage(message) - } - } + for { + select { + case mqttMessage := <-mqtt.InputChannel: + //log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) + message := handler.MessageT{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)} + archiverChannel <- message + handleMessage(message) + } + } } func handleMessage(message handler.MessageT) { - for _, mapping := range config.Config.TopicMappings { - // log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler) - for _, subscribedTopic := range mapping.Topics { - // log.Printf("Testing %s in %s", message.Topic, subscribedTopic) - if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { - //log.Printf("Handle message in handler %s", mapping.Id) - handler, exists := handlerMap[mapping.Id] - if exists { - handler.Handle(message) - counter.S("Dispatched") - return - } else { - log.Printf("Handler %s not found, message %s is lost", mapping.Id, message) - counter.F("Dispatched") - } - } - } - } - log.Printf("No match for topic %s, message %s is lost", message.Topic, message) - counter.F("Dispatched") + for _, mapping := range config.Config.TopicMappings { + // log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler) + for _, subscribedTopic := range mapping.Topics { + // log.Printf("Testing %s in %s", message.Topic, subscribedTopic) + if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { + //log.Printf("Handle message in handler %s", mapping.Id) + handler, exists := handlerMap[mapping.Id] + if exists { + handler.Handle(message) + counter.S("Dispatched") + return + } else { + log.Printf("Handler %s not found, message %s is lost", mapping.Id, message) + counter.F("Dispatched") + } + } + } + } + log.Printf("No match for topic %s, message %s is lost", message.Topic, message) + counter.F("Dispatched") } diff --git a/src/udi/handlers/car/car.go b/src/udi/handlers/car/car.go new file mode 100644 index 0000000..ba042a9 --- /dev/null +++ b/src/udi/handlers/car/car.go @@ -0,0 +1,94 @@ +package car + +import ( + "encoding/json" + "log" + "reflect" + "time" + "udi/config" + "udi/database" + "udi/handlers/handler" +) + +type CarHandler struct { + handler.CommonHandler + dbh *database.DatabaseHandle +} + +/* +{ + "status": "Ok", + "timestamp": "2025-12-15T13:11:15.648243", + "voltageL1": 228.68, + "voltageL2": 227.69, + "voltageL3": 228.53, + "currentL1": 0.0, + "currentL2": 0.0, + "currentL3": 0.0, + "powerL1": 0.0, + "powerL2": 0.0, + "powerL3": 0.0, + "totalImportEnergy": 0.0, + "totalExportEnergy": 0.0, + "cnt": 399300} +*/ + +type CarValue struct { + Status string `unit:"" json:"status"` + Timestamp string `unit:"" json:"timestamp"` + VoltageL1 float32 `unit:"V" json:"voltageL1"` + VoltageL2 float32 `unit:"V" json:"voltageL2"` + VoltageL3 float32 `unit:"V" json:"voltageL3"` + CurrentL1 float32 `unit:"A" json:"currentL1"` + CurrentL2 float32 `unit:"A" json:"currentL2"` + CurrentL3 float32 `unit:"A" json:"currentL3"` + PowerL1 float32 `unit:"W" json:"powerL1"` + PowerL2 float32 `unit:"W" json:"powerL2"` + PowerL3 float32 `unit:"W" json:"powerL3"` + TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"` + TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"` + Cnt int `unit:"" json:"cnt"` +} + +func New(id string, config config.HandlerConfigT) handler.Handler { + t := &CarHandler{} + t.Id = id + t.dbh = database.NewDatabaseHandle() + log.Printf("Handler Car %d initialized", id) + return t +} + +func (self *CarHandler) Handle(message handler.MessageT) { + //log.Printf("Handler Car %d processing %s -> %s", self.id, message.Topic, message.Payload) + + var carValue CarValue + err := json.Unmarshal([]byte(message.Payload), &carValue) + if err != nil { + self.Lost("Unable to parse payload into carValue struct", err, message) + return + } + + variables := make(map[string]database.VariableType) + carValueStructValue := reflect.ValueOf(carValue) + for i := 0; i < carValueStructValue.NumField(); i++ { + field := carValueStructValue.Type().Field(i) + fieldValue := carValueStructValue.Field(i) + v := database.VariableType{ + Label: "", + Variable: field.Name, + Unit: field.Tag.Get("unit"), + Value: fieldValue.Interface(), + } + variables[field.Name] = v + } + + measurement := database.Measurement{ + Time: time.Now(), + Application: "Car", + Device: "Powermeter", + Values: variables, + } + + self.dbh.StoreMeasurement(&measurement) + self.S() +}