diff --git a/src/udi/main.go b/src/udi/main.go index 6636bb1..1fa815f 100644 --- a/src/udi/main.go +++ b/src/udi/main.go @@ -1,150 +1,30 @@ package main import "log" -import "fmt" import "os" import "os/signal" -import "strings" -// import "time" -import MQTT "github.com/eclipse/paho.mqtt.golang" -import "github.com/google/uuid" -import "crypto/tls" import um "udi/mqtt" -type Message struct { - topic string - payload []byte -} - -var messageInputChannel chan Message = make(chan Message, 100) -var messageOutputChannel chan Message = make(chan Message, 100) - - -func onMessageReceived(client MQTT.Client, message MQTT.Message) { - // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) - m := Message { - topic: message.Topic(), - payload: message.Payload(), - } - select { - case messageInputChannel <- m: - {} - //log.Println("Message sent to channel") - default: - log.Println("Channel full, message lost") - } -} - -func onConnectionLost(client MQTT.Client, error error) { - log.Printf("Connection lost, error %s", error) -} - -func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { - log.Println("Oops, connection lost, already reconnecting ...") -} - - -func onConnect(client MQTT.Client) { - topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") - if topicsStr == "" { - log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") - } - topics := strings.Split(topicsStr, ",") - for _, topic := range topics { - if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) - } - log.Printf("Successfully subscribed to topic %s", topic) - } -} func inputDispatcher() { for { select { - case message := <- messageInputChannel: - log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + case message := <- um.InputChannel: + log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) } } } -func outputDispatcher(client MQTT.Client) { - for { - select { - case message := <- messageOutputChannel: - log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) - if token := client.Publish(message.topic, 0, false, message.payload); token.Wait() && token.Error() != nil { - log.Printf("Unable to publish, error %s", token.Error()) - } - log.Println("Successfully published") - } - } -} - -func startMqttClient() MQTT.Client { - broker := os.Getenv("MQTT_BROKER") - if broker == "" { - log.Fatal("No broker given, set env var MQTT_BROKER") - } - - prefix := "UDI" - uuid := uuid.New() - clientId := fmt.Sprintf("%s-%s", prefix, uuid) - - opts := MQTT.NewClientOptions(). - AddBroker(broker). - SetClientID(clientId). - SetConnectionLostHandler(onConnectionLost). - SetOnConnectHandler(onConnect). - SetReconnectingHandler(onReconnecting). - SetConnectRetry(true) - - username := os.Getenv("MQTT_USERNAME") - if username != "" { - opts.SetUsername(username) - } - - password := os.Getenv("MQTT_PASSWORD") - if password != "" { - opts.SetPassword(password) - } - - enableTls := os.Getenv("MQTT_ENABLE_TLS") - if enableTls == "true" { - log.Println("Enableing TLS connection") - tlsConfig := &tls.Config { - InsecureSkipVerify: true, - } - opts.SetTLSConfig(tlsConfig) - } - - log.Println("Trying to connect to broker") - client := MQTT.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) - } - log.Printf("Successfully connected to broker %s", broker) - - return client -} - -func stopMqttClient(client MQTT.Client) { - log.Println("Disconnecting from broker") - client.Disconnect(250) -} - func main() { log.SetPrefix("UDI: ") log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.Println("UDI starting") - um.TestMqtt() go inputDispatcher() - mqttClient := startMqttClient() - defer stopMqttClient(mqttClient) - - go outputDispatcher(mqttClient) + um.StartMqttClient() + defer um.StopMqttClient() log.Println("UDI running") diff --git a/src/udi/mqtt/mqtt.go b/src/udi/mqtt/mqtt.go index 4cefc69..a6d21b1 100644 --- a/src/udi/mqtt/mqtt.go +++ b/src/udi/mqtt/mqtt.go @@ -1,9 +1,126 @@ package mqtt import "log" +import "fmt" +import "os" +import "strings" +// import "time" +import MQTT "github.com/eclipse/paho.mqtt.golang" +import "github.com/google/uuid" +import "crypto/tls" - -func TestMqtt() { - log.Println("test mqtt") +type Message struct { + Topic string + Payload []byte +} + +var InputChannel chan Message = make(chan Message, 100) +var OutputChannel chan Message = make(chan Message, 100) + +var mqttClient MQTT.Client + +func onMessageReceived(client MQTT.Client, message MQTT.Message) { + // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) + m := Message { + Topic: message.Topic(), + Payload: message.Payload(), + } + select { + case InputChannel <- m: + {} + //log.Println("Message sent to channel") + default: + log.Println("Channel full, message lost") + } +} + +func onConnectionLost(client MQTT.Client, error error) { + log.Printf("Connection lost, error %s", error) +} + +func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { + log.Println("Oops, connection lost, already reconnecting ...") +} + + +func onConnect(client MQTT.Client) { + topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") + if topicsStr == "" { + log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") + } + topics := strings.Split(topicsStr, ",") + for _, topic := range topics { + if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) + } + log.Printf("Successfully subscribed to topic %s", topic) + } +} + +func outputDispatcher(client MQTT.Client) { + for { + select { + case message := <- OutputChannel: + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) + if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") + } + } +} + +func StartMqttClient() { + broker := os.Getenv("MQTT_BROKER") + if broker == "" { + log.Fatal("No broker given, set env var MQTT_BROKER") + } + + prefix := "UDI" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) + + opts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) + + username := os.Getenv("MQTT_USERNAME") + if username != "" { + opts.SetUsername(username) + } + + password := os.Getenv("MQTT_PASSWORD") + if password != "" { + opts.SetPassword(password) + } + + enableTls := os.Getenv("MQTT_ENABLE_TLS") + if enableTls == "true" { + log.Println("Enableing TLS connection") + tlsConfig := &tls.Config { + InsecureSkipVerify: true, + } + opts.SetTLSConfig(tlsConfig) + } + + log.Println("Trying to connect to broker") + mqttClient = MQTT.NewClient(opts) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + log.Printf("Successfully connected to broker %s", broker) + + go outputDispatcher(mqttClient) + + return +} + +func StopMqttClient() { + log.Println("Disconnecting from broker") + mqttClient.Disconnect(250) } diff --git a/src/udi/udi b/src/udi/udi index ec9b370..6d762d7 100755 Binary files a/src/udi/udi and b/src/udi/udi differ