commit e241319e6eef8b660d7ead5cb590c97e5d4b5dc0 Author: Wolfgang Hottgenroth Date: Sun Dec 1 15:56:24 2024 +0100 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bd0cd5f --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +src/udi/udi +src/udi/main +src/udi/migrate_schema +tmp/ +ENVDB +ENVDB.cluster +deployment/secrets.txt +deployment/secrets diff --git a/src/ma/archiver/archiver.go b/src/ma/archiver/archiver.go new file mode 100644 index 0000000..e402257 --- /dev/null +++ b/src/ma/archiver/archiver.go @@ -0,0 +1,33 @@ +package archiver + +import "log" +import "time" +import "os" +import "fmt" +import "net/url" +import "ma/mqtt" +import "ma/config" +import "ma/counter" +import "ma/database" + + +func InitArchiver() { + log.Printf("Archiver initializing") +} + +func InputArchiver() { + for { + select { + case mqttMessage := <- mqtt.InputChannel: + message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } + archiverChannel <- message + handleMessage(message) + } + } +} + +func handleMessage(message database.MessageT) { + log.Printf("Archiving %", message) + counter.S("Stored") +} + diff --git a/src/ma/config/config.go b/src/ma/config/config.go new file mode 100644 index 0000000..04718f1 --- /dev/null +++ b/src/ma/config/config.go @@ -0,0 +1,33 @@ +package config + +import "encoding/json" +import "log" +import "os" + +type HandlerConfigT struct { + Attributes map[string]string `json:"attributes"` +} + +type ConfigT struct { + Mqtt struct { + Broker string `json:"broker"` + Username string `json:"username"` + Password string + TlsEnable string `json:"tlsEnable"` + } `json:"mqtt"` + IncludeTopics []string `json:"includeTopics"` + ExcludeTopics []string `json:"excludeTopics"` +} + +var Config ConfigT + + +func LoadConfiguration() { + err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config) + if err != nil { + log.Fatalf("Unable to parse configuration: %s", err) + } + + Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD") +} + diff --git a/src/ma/counter/counter.go b/src/ma/counter/counter.go new file mode 100644 index 0000000..a898b13 --- /dev/null +++ b/src/ma/counter/counter.go @@ -0,0 +1,87 @@ +package counter + +import ( + "log" + "time" + "encoding/json" +) + +type statsTuple_t struct { + Successful int `json:"good"` + Failed int `json:"bad"` +} + +type stats_t struct { + Received statsTuple_t `json:"received"` + Stored statsTuple_t `json:"stored"` + Skipped statsTuple_t `json:"skipped"` +} + +var stats stats_t + +func S(id string) { + switch id { + case "Received": + stats.Received.Successful = stats.Received.Successful + 1 + case "Stored": + stats.Stored.Successful += 1 + case "Skipped": + stats.Skipped.Successful += 1 + default: + log.Printf("Unknown stats id %s", id) + } +} + +func F(id string) { + switch id { + case "Received": + stats.Received.Failed += 1 + case "Stored": + stats.Stored.Failed += 1 + case "Skipped": + stats.Skipped.Failed += 1 + default: + log.Printf("Unknown stats id %s", id) + } +} + +func SH(id string) { + if _, ok := stats.Handled[id]; ok { + tuple := stats.Handled[id] + tuple.Successful += 1 + stats.Handled[id] = tuple + } else { + stats.Handled[id] = statsTuple_t { Successful:1, Failed:0, } + } +} + +func FH(id string) { + if _, ok := stats.Handled[id]; ok { + tuple := stats.Handled[id] + tuple.Failed += 1 + stats.Handled[id] = tuple + } else { + stats.Handled[id] = statsTuple_t { Successful:0, Failed:1, } + } +} + +func InitCounter() { + stats = stats_t { + Received: statsTuple_t {Successful:0,Failed:0,}, + Stored: statsTuple_t {Successful:0,Failed:0,}, + Skipped: statsTuple_t {Successful:0,Failed:0,}, + Handled: make(map[string]statsTuple_t), + } + + go func() { + for { + sj, err := json.Marshal(stats) + if err != nil { + log.Printf("Unable to marshal stats object: %s", err) + } + log.Println(string(sj)) + time.Sleep(time.Second * 60) + } + }() +} + diff --git a/src/ma/database/abstract_database.go b/src/ma/database/abstract_database.go new file mode 100644 index 0000000..fec2d7d --- /dev/null +++ b/src/ma/database/abstract_database.go @@ -0,0 +1,12 @@ +package database + +import "time" +import "gorm.io/gorm" + + +type Message struct { + Time time.Time `gorm:"not null;primary_key"` + Topic string `gorm:"not null"` + Payload string `gorm:"not null"` +} + diff --git a/src/ma/database/database.go b/src/ma/database/database.go new file mode 100644 index 0000000..323c665 --- /dev/null +++ b/src/ma/database/database.go @@ -0,0 +1,53 @@ +package database + + +import ( + "log" + //"time" + "fmt" + "ma/counter" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +type DatabaseHandle struct { + initialized bool + dbh *gorm.DB +} + +func NewDatabaseHandle() *DatabaseHandle { + var db DatabaseHandle + // inject the whole database configuration via the well-known PG* env variables + conn, err := gorm.Open(postgres.Open("")) + if err != nil { + log.Printf("Unable to open database connection: %s", err) + db.initialized = false + } else { + db.dbh = conn + db.initialized = true + //log.Println("Database connection opened") + } + return &db +} + +func (self *DatabaseHandle) StoreMessage(message *Message) { + if ! self.initialized { + log.Printf("Database connection not initialized, can not store, message %s lost", message) + counter.F("Stored") + return + } + + result := self.dbh.Create(message) + if result.Error != nil { + log.Printf("Unable to insert, message %s lost, error: %s", message, result.Error) + counter.F("Stored") + return + } + + //log.Println("Successfully stored message") + counter.S("Stored") +} + + + + diff --git a/src/ma/database/migrate_database.go b/src/ma/database/migrate_database.go new file mode 100644 index 0000000..97050fc --- /dev/null +++ b/src/ma/database/migrate_database.go @@ -0,0 +1,22 @@ +package database + +import ( + "log" + //"time" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func Migrate() { + dsn := "" + db, err := gorm.Open(postgres.Open(dsn)) + if err != nil { + log.Fatalf("Unable to open database connection: %s", err) + } + + db.AutoMigrate(&Message{}) + log.Println("Message created") + + log.Println("Remember to call create_hypertable on message, sowhat I can't do that for you.") +} + diff --git a/src/ma/database/schema.sql b/src/ma/database/schema.sql new file mode 100644 index 0000000..1284842 --- /dev/null +++ b/src/ma/database/schema.sql @@ -0,0 +1,7 @@ +create extension if not exists timescaledb; + + +select create_hypertable('message', 'time'); + + + diff --git a/src/ma/go.mod b/src/ma/go.mod new file mode 100644 index 0000000..17205bc --- /dev/null +++ b/src/ma/go.mod @@ -0,0 +1,24 @@ +module ma + +go 1.21.3 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/google/uuid v1.6.0 + gorm.io/driver/postgres v1.5.10 + gorm.io/gorm v1.25.12 +) + +require ( + github.com/gorilla/websocket v1.5.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/text v0.16.0 // indirect +) diff --git a/src/ma/go.sum b/src/ma/go.sum new file mode 100644 index 0000000..47a8c4d --- /dev/null +++ b/src/ma/go.sum @@ -0,0 +1,44 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE= +gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= diff --git a/src/ma/main.go b/src/ma/main.go new file mode 100644 index 0000000..45496b9 --- /dev/null +++ b/src/ma/main.go @@ -0,0 +1,37 @@ +package main + +import "log" +import "os" +import "os/signal" +import "ma/mqtt" +import "ma/config" +import "ma/counter" +import "ma/archiver" + + + +func main() { + log.SetPrefix("MA: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("MA starting") + + config.LoadConfiguration() + + archiver.InitArchiver() + go archiver.InputArchiver() + + mqtt.StartMqttClient() + defer mqtt.StopMqttClient() + + counter.InitCounter() + + log.Println("MA running") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + <-c + + log.Println("Terminating MA") +} + diff --git a/src/ma/migrate_schema.go b/src/ma/migrate_schema.go new file mode 100644 index 0000000..b487da6 --- /dev/null +++ b/src/ma/migrate_schema.go @@ -0,0 +1,18 @@ +package main + +import "log" +import "ma/database" + + + +func main() { + log.SetPrefix("UDI Migrate Schema: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("Starting") + + database.Migrate() + + log.Println("Done") +} + diff --git a/src/ma/mqtt/mqtt.go b/src/ma/mqtt/mqtt.go new file mode 100644 index 0000000..70a55cc --- /dev/null +++ b/src/ma/mqtt/mqtt.go @@ -0,0 +1,147 @@ +package mqtt + +import "log" +import "strings" +import "fmt" +import MQTT "github.com/eclipse/paho.mqtt.golang" +import "github.com/google/uuid" +import "crypto/tls" +import "udi/config" +import "udi/counter" + +type Message struct { + Topic string + Payload []byte +} + +var InputChannel chan Message = make(chan Message, 100) +var OutputChannel chan Message = make(chan Message, 100) + +var mqttClient MQTT.Client + +func onMessageReceived(client MQTT.Client, message MQTT.Message) { + // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) + for _, i := range config.Config.ExcludeTopics { + if i == message.Topic { + counter.S("Skipped") + return + } + } + + m := Message { + Topic: message.Topic(), + Payload: message.Payload(), + } + select { + case InputChannel <- m: + counter.S("Received") + {} + //log.Println("Message sent to channel") + default: + log.Println("Channel full, message lost") + counter.F("Received") + } +} + +func onConnectionLost(client MQTT.Client, error error) { + log.Printf("Connection lost, error %s", error) +} + +func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { + log.Println("Oops, connection lost, already reconnecting ...") +} + + +func onConnect(client MQTT.Client) { + for _, topic := range config.Config.IncludeTopics { + if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) + } + log.Printf("Topic %s subscribed", topic) + } +} + +func outputDispatcher(client MQTT.Client) { + for { + select { + case message := <- OutputChannel: + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) + if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") + } + } +} + +func StartMqttClient() { + broker := config.Config.Mqtt.Broker + if broker == "" { + log.Fatal("No broker given, set env var MQTT_BROKER") + } + + prefix := "MA" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) + + opts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) + + username := config.Config.Mqtt.Username + if username != "" { + opts.SetUsername(username) + } + + password := config.Config.Mqtt.Password + if password != "" { + opts.SetPassword(password) + } + + enableTls := config.Config.Mqtt.TlsEnable + if enableTls == "true" { + //log.Println("Enabling TLS connection") + tlsConfig := &tls.Config { + InsecureSkipVerify: true, + } + opts.SetTLSConfig(tlsConfig) + } + + log.Println("Broker connecting") + mqttClient = MQTT.NewClient(opts) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + //log.Printf("Successfully connected to broker %s", broker) + + go outputDispatcher(mqttClient) + + return +} + +func StopMqttClient() { + log.Println("Disconnecting from broker") + mqttClient.Disconnect(250) +} + +func TopicMatchesSubscription(topic, subscription string) bool { + topicSegments := strings.Split(topic, "/") + subscriptionSegments := strings.Split(subscription, "/") + + for i, subSegment := range subscriptionSegments { + if subSegment == "+" { + continue + } else if subSegment == "#" { + return true + } else if i < len(topicSegments) && subSegment != topicSegments[i] { + return false + } + } + + return true +} +