dispatcher

This commit is contained in:
2023-11-27 13:09:41 +01:00
parent 7262592576
commit a1fbdb7677
4 changed files with 62 additions and 24 deletions

View File

@ -7,20 +7,20 @@ import "os"
type ConfigT struct { type ConfigT struct {
Mqtt struct { Mqtt struct {
Broker string Broker string `json:"broker"`
Username string Username string `json:"username"`
Password string Password string `json:"password"`
TlsEnable string TlsEnable string `json:"tlsEnable"`
} } `json:"mqtt"`
TopicMappings []struct { TopicMappings []struct {
Topics []string Topics []string `json:topics`
Plugin string Plugin string `json:plugin`
} } `json:"TopicMappings"`
Plugins []struct { Plugins []struct {
Name string Name string `json:"name"`
DatabaseConnStr string DatabaseConnStr string `json:"databaseConnStr"`
Attributes map[string]interface{} Attributes map[string]string `json:"attributes"`
} } `json:"plugins"`
} }
var Config ConfigT var Config ConfigT

View File

@ -0,0 +1,27 @@
package dispatcher
import "log"
import "udi/mqtt"
import "udi/config"
func InputDispatcher() {
for {
select {
case message := <- mqtt.InputChannel:
log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
for _, mapping := range config.Config.TopicMappings {
log.Printf("Testing %s -> %s", mapping.Topics, mapping.Plugin)
for _, subscribedTopic := range mapping.Topics {
log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
log.Printf("Handle message in plugin %s", mapping.Plugin)
} else {
log.Printf("no match")
}
}
}
}
}
}

View File

@ -3,18 +3,11 @@ package main
import "log" import "log"
import "os" import "os"
import "os/signal" import "os/signal"
import um "udi/mqtt" import "udi/mqtt"
import "udi/config" import "udi/config"
import "udi/dispatcher"
func inputDispatcher() {
for {
select {
case message := <- um.InputChannel:
log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
}
}
}
func main() { func main() {
log.SetPrefix("UDI: ") log.SetPrefix("UDI: ")
@ -24,10 +17,10 @@ func main() {
config.LoadConfiguration() config.LoadConfiguration()
go inputDispatcher() go dispatcher.InputDispatcher()
um.StartMqttClient() mqtt.StartMqttClient()
defer um.StopMqttClient() defer mqtt.StopMqttClient()
log.Println("UDI running") log.Println("UDI running")

View File

@ -1,6 +1,7 @@
package mqtt package mqtt
import "log" import "log"
import "strings"
import "fmt" import "fmt"
import MQTT "github.com/eclipse/paho.mqtt.golang" import MQTT "github.com/eclipse/paho.mqtt.golang"
import "github.com/google/uuid" import "github.com/google/uuid"
@ -124,3 +125,20 @@ func StopMqttClient() {
mqttClient.Disconnect(250) mqttClient.Disconnect(250)
} }
func TopicMatchesSubscription(topic, subscription string) bool {
topicSegments := strings.Split(topic, "/")
subscriptionSegments := strings.Split(subscription, "/")
for i, subSegment := range subscriptionSegments {
if subSegment == "+" {
continue
} else if subSegment == "#" {
return true
} else if i < len(topicSegments) && subSegment != topicSegments[i] {
return false
}
}
return true
}