From a1fbdb7677bacb49ba1595f636e5c5420e0f1d58 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 27 Nov 2023 13:09:41 +0100 Subject: [PATCH] dispatcher --- src/udi/config/config.go | 24 ++++++++++++------------ src/udi/dispatcher/dispatcher.go | 27 +++++++++++++++++++++++++++ src/udi/main.go | 17 +++++------------ src/udi/mqtt/mqtt.go | 18 ++++++++++++++++++ 4 files changed, 62 insertions(+), 24 deletions(-) create mode 100644 src/udi/dispatcher/dispatcher.go diff --git a/src/udi/config/config.go b/src/udi/config/config.go index efbd31c..419db6d 100644 --- a/src/udi/config/config.go +++ b/src/udi/config/config.go @@ -7,20 +7,20 @@ import "os" type ConfigT struct { Mqtt struct { - Broker string - Username string - Password string - TlsEnable string - } + Broker string `json:"broker"` + Username string `json:"username"` + Password string `json:"password"` + TlsEnable string `json:"tlsEnable"` + } `json:"mqtt"` TopicMappings []struct { - Topics []string - Plugin string - } + Topics []string `json:topics` + Plugin string `json:plugin` + } `json:"TopicMappings"` Plugins []struct { - Name string - DatabaseConnStr string - Attributes map[string]interface{} - } + Name string `json:"name"` + DatabaseConnStr string `json:"databaseConnStr"` + Attributes map[string]string `json:"attributes"` + } `json:"plugins"` } var Config ConfigT diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go new file mode 100644 index 0000000..a2ac2db --- /dev/null +++ b/src/udi/dispatcher/dispatcher.go @@ -0,0 +1,27 @@ +package dispatcher + +import "log" +import "udi/mqtt" +import "udi/config" + + +func InputDispatcher() { + for { + select { + case message := <- mqtt.InputChannel: + log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) + + for _, mapping := range config.Config.TopicMappings { + log.Printf("Testing %s -> %s", mapping.Topics, mapping.Plugin) + for _, subscribedTopic := range mapping.Topics { + log.Printf("Testing %s in %s", message.Topic, subscribedTopic) + if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { + log.Printf("Handle message in plugin %s", mapping.Plugin) + } else { + log.Printf("no match") + } + } + } + } + } +} diff --git a/src/udi/main.go b/src/udi/main.go index a9f18fc..88e4861 100644 --- a/src/udi/main.go +++ b/src/udi/main.go @@ -3,18 +3,11 @@ package main import "log" import "os" import "os/signal" -import um "udi/mqtt" +import "udi/mqtt" import "udi/config" +import "udi/dispatcher" -func inputDispatcher() { - for { - select { - case message := <- um.InputChannel: - log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) - } - } -} func main() { log.SetPrefix("UDI: ") @@ -24,10 +17,10 @@ func main() { config.LoadConfiguration() - go inputDispatcher() + go dispatcher.InputDispatcher() - um.StartMqttClient() - defer um.StopMqttClient() + mqtt.StartMqttClient() + defer mqtt.StopMqttClient() log.Println("UDI running") diff --git a/src/udi/mqtt/mqtt.go b/src/udi/mqtt/mqtt.go index cf0cc4e..320dcb4 100644 --- a/src/udi/mqtt/mqtt.go +++ b/src/udi/mqtt/mqtt.go @@ -1,6 +1,7 @@ package mqtt import "log" +import "strings" import "fmt" import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" @@ -124,3 +125,20 @@ func StopMqttClient() { mqttClient.Disconnect(250) } +func TopicMatchesSubscription(topic, subscription string) bool { + topicSegments := strings.Split(topic, "/") + subscriptionSegments := strings.Split(subscription, "/") + + for i, subSegment := range subscriptionSegments { + if subSegment == "+" { + continue + } else if subSegment == "#" { + return true + } else if i < len(topicSegments) && subSegment != topicSegments[i] { + return false + } + } + + return true +} +