From 1297b4bd97b1bf4502be1bffd6fff20496eee407 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 25 Nov 2023 17:50:33 +0100 Subject: [PATCH] outputDispatcher --- .gitignore | 1 + main.go | 29 ++++++++++++++++++++++------- 2 files changed, 23 insertions(+), 7 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8cd93c3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +udi diff --git a/main.go b/main.go index 8db0ad4..ac7e495 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,8 @@ type Message struct { payload []byte } -var messageChannel chan Message +var messageInputChannel chan Message = make(chan Message, 100) +var messageOutputChannel chan Message = make(chan Message, 100) func onMessageReceived(client MQTT.Client, message MQTT.Message) { @@ -26,7 +27,7 @@ func onMessageReceived(client MQTT.Client, message MQTT.Message) { payload: message.Payload(), } select { - case messageChannel <- m: + case messageInputChannel <- m: {} //log.Println("Message sent to channel") default: @@ -57,11 +58,24 @@ func onConnect(client MQTT.Client) { } } -func dispatcher() { +func inputDispatcher() { for { select { - case message := <- messageChannel: - log.Printf("Message arrived in dispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + case message := <- messageInputChannel: + log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + } + } +} + +func outputDispatcher(client MQTT.Client) { + for { + select { + case message := <- messageOutputChannel: + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + if token := client.Publish(message.topic, 0, false, message.payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") } } } @@ -124,12 +138,13 @@ func main() { log.Println("UDI starting") - messageChannel = make(chan Message, 100) - go dispatcher() + go inputDispatcher() mqttClient := startMqttClient() defer stopMqttClient(mqttClient) + go outputDispatcher(mqttClient) + log.Println("UDI running") c := make(chan os.Signal, 1)