add car powermeter

This commit is contained in:
2025-12-15 14:19:18 +01:00
parent 61509c0000
commit 95984157e8
4 changed files with 235 additions and 127 deletions

View File

@@ -51,12 +51,12 @@ steps:
settings:
repo: ${FORGE_NAME}/${CI_REPO}
registry:
from_secret: container_registry
from_secret: local_registry
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
username:
from_secret: container_registry_username
from_secret: local_username
password:
from_secret: container_registry_password
from_secret: local_password
dockerfile: Dockerfile
when:
- event: [push, tag]

View File

@@ -62,6 +62,16 @@
}
}
},
{
"topics": [ "IoT/Car/Values" ],
"handler": "Car",
"id": "Car",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "locative/event/#" ],
"handler": "Locative",

View File

@@ -1,149 +1,153 @@
package dispatcher
import "log"
import "time"
import "os"
import "fmt"
import "net/url"
import "udi/mqtt"
import "udi/config"
import "udi/counter"
import "udi/handlers/handler"
import "udi/handlers/ttn"
import "udi/handlers/iot"
import "udi/handlers/pv"
import "udi/handlers/mbgw3"
import "udi/handlers/sver"
import "udi/handlers/svej"
import "udi/handlers/dt1t"
import "udi/handlers/locative"
import "udi/handlers/prepared"
import "udi/handlers/z2m"
import (
"fmt"
"log"
"net/url"
"os"
"time"
"udi/config"
"udi/counter"
"udi/handlers/car"
"udi/handlers/dt1t"
"udi/handlers/handler"
"udi/handlers/iot"
"udi/handlers/locative"
"udi/handlers/mbgw3"
"udi/handlers/prepared"
"udi/handlers/pv"
"udi/handlers/svej"
"udi/handlers/sver"
"udi/handlers/ttn"
"udi/handlers/z2m"
"udi/mqtt"
)
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
func InitDispatcher() {
log.Printf("Dispatcher initializing")
go archiver()
log.Printf("Dispatcher initializing")
go archiver()
for _, mapping := range config.Config.TopicMappings {
// log.Printf("Trying to initialize %s", mapping)
for _, mapping := range config.Config.TopicMappings {
// log.Printf("Trying to initialize %s", mapping)
var factory interface{}
switch mapping.Handler {
case "TTN":
factory = ttn.New
case "IoT":
factory = iot.New
case "PV":
factory = pv.New
case "MBGW3":
factory = mbgw3.New
case "SVER":
factory = sver.New
case "SVEJ":
factory = svej.New
case "DT1T":
factory = dt1t.New
case "Locative":
factory = locative.New
case "PREP":
factory = prepared.New
case "Z2M":
factory = z2m.New
default:
factory = nil
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
}
var factory interface{}
switch mapping.Handler {
case "TTN":
factory = ttn.New
case "IoT":
factory = iot.New
case "PV":
factory = pv.New
case "MBGW3":
factory = mbgw3.New
case "SVER":
factory = sver.New
case "SVEJ":
factory = svej.New
case "DT1T":
factory = dt1t.New
case "Locative":
factory = locative.New
case "PREP":
factory = prepared.New
case "Z2M":
factory = z2m.New
case "Car":
factory = car.New
default:
factory = nil
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
}
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
if ! ok {
log.Println("Typ Assertion failed")
break
}
handler := fn(mapping.Id, mapping.Config)
handlerMap[mapping.Id] = handler
}
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
if !ok {
log.Println("Typ Assertion failed")
break
}
handler := fn(mapping.Id, mapping.Config)
handlerMap[mapping.Id] = handler
}
//log.Printf("handlerMap: %s", handlerMap)
//log.Printf("handlerMap: %s", handlerMap)
}
func storeMessage(filename string, item handler.MessageT) {
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644)
if err != nil {
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
counter.F("Archived")
return
}
defer file.Close()
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
_, err = file.WriteString(string(archivingString) + "\n")
if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err)
counter.F("Archived")
return
}
//log.Println("Successfully archived message")
counter.S("Archived")
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
counter.F("Archived")
return
}
defer file.Close()
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
_, err = file.WriteString(string(archivingString) + "\n")
if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err)
counter.F("Archived")
return
}
//log.Println("Successfully archived message")
counter.S("Archived")
}
func archiver() {
archivingRootDir := config.Config.Archiver.Dir
var lastArchivingDir string
archivingRootDir := config.Config.Archiver.Dir
var lastArchivingDir string
for {
select {
case message := <- archiverChannel:
currentDateStr := message.Timestamp.Format("2006/01/02/15")
currentArchivingDir := archivingRootDir + "/" + currentDateStr
if currentArchivingDir != lastArchivingDir {
err := os.MkdirAll(currentArchivingDir, 0755)
if err != nil {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
counter.F("Archived")
}
lastArchivingDir = currentArchivingDir
//log.Printf("Archiving dir %s created", currentArchivingDir)
}
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
storeMessage(archivingFilename, message)
}
}
for {
select {
case message := <-archiverChannel:
currentDateStr := message.Timestamp.Format("2006/01/02/15")
currentArchivingDir := archivingRootDir + "/" + currentDateStr
if currentArchivingDir != lastArchivingDir {
err := os.MkdirAll(currentArchivingDir, 0755)
if err != nil {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
counter.F("Archived")
}
lastArchivingDir = currentArchivingDir
//log.Printf("Archiving dir %s created", currentArchivingDir)
}
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
storeMessage(archivingFilename, message)
}
}
}
func InputDispatcher() {
for {
select {
case mqttMessage := <- mqtt.InputChannel:
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
archiverChannel <- message
handleMessage(message)
}
}
for {
select {
case mqttMessage := <-mqtt.InputChannel:
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
message := handler.MessageT{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
archiverChannel <- message
handleMessage(message)
}
}
}
func handleMessage(message handler.MessageT) {
for _, mapping := range config.Config.TopicMappings {
// log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
for _, subscribedTopic := range mapping.Topics {
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
//log.Printf("Handle message in handler %s", mapping.Id)
handler, exists := handlerMap[mapping.Id]
if exists {
handler.Handle(message)
counter.S("Dispatched")
return
} else {
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
counter.F("Dispatched")
}
}
}
}
log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
counter.F("Dispatched")
for _, mapping := range config.Config.TopicMappings {
// log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
for _, subscribedTopic := range mapping.Topics {
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
//log.Printf("Handle message in handler %s", mapping.Id)
handler, exists := handlerMap[mapping.Id]
if exists {
handler.Handle(message)
counter.S("Dispatched")
return
} else {
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
counter.F("Dispatched")
}
}
}
}
log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
counter.F("Dispatched")
}

View File

@@ -0,0 +1,94 @@
package car
import (
"encoding/json"
"log"
"reflect"
"time"
"udi/config"
"udi/database"
"udi/handlers/handler"
)
type CarHandler struct {
handler.CommonHandler
dbh *database.DatabaseHandle
}
/*
{
"status": "Ok",
"timestamp": "2025-12-15T13:11:15.648243",
"voltageL1": 228.68,
"voltageL2": 227.69,
"voltageL3": 228.53,
"currentL1": 0.0,
"currentL2": 0.0,
"currentL3": 0.0,
"powerL1": 0.0,
"powerL2": 0.0,
"powerL3": 0.0,
"totalImportEnergy": 0.0,
"totalExportEnergy": 0.0,
"cnt": 399300}
*/
type CarValue struct {
Status string `unit:"" json:"status"`
Timestamp string `unit:"" json:"timestamp"`
VoltageL1 float32 `unit:"V" json:"voltageL1"`
VoltageL2 float32 `unit:"V" json:"voltageL2"`
VoltageL3 float32 `unit:"V" json:"voltageL3"`
CurrentL1 float32 `unit:"A" json:"currentL1"`
CurrentL2 float32 `unit:"A" json:"currentL2"`
CurrentL3 float32 `unit:"A" json:"currentL3"`
PowerL1 float32 `unit:"W" json:"powerL1"`
PowerL2 float32 `unit:"W" json:"powerL2"`
PowerL3 float32 `unit:"W" json:"powerL3"`
TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"`
TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"`
Cnt int `unit:"" json:"cnt"`
}
func New(id string, config config.HandlerConfigT) handler.Handler {
t := &CarHandler{}
t.Id = id
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler Car %d initialized", id)
return t
}
func (self *CarHandler) Handle(message handler.MessageT) {
//log.Printf("Handler Car %d processing %s -> %s", self.id, message.Topic, message.Payload)
var carValue CarValue
err := json.Unmarshal([]byte(message.Payload), &carValue)
if err != nil {
self.Lost("Unable to parse payload into carValue struct", err, message)
return
}
variables := make(map[string]database.VariableType)
carValueStructValue := reflect.ValueOf(carValue)
for i := 0; i < carValueStructValue.NumField(); i++ {
field := carValueStructValue.Type().Field(i)
fieldValue := carValueStructValue.Field(i)
v := database.VariableType{
Label: "",
Variable: field.Name,
Unit: field.Tag.Get("unit"),
Value: fieldValue.Interface(),
}
variables[field.Name] = v
}
measurement := database.Measurement{
Time: time.Now(),
Application: "Car",
Device: "Powermeter",
Values: variables,
}
self.dbh.StoreMeasurement(&measurement)
self.S()
}