package main import "log" import "fmt" import "os" import "os/signal" import "strings" // import "time" import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" import "crypto/tls" type Message struct { topic string payload []byte } var messageInputChannel chan Message = make(chan Message, 100) var messageOutputChannel chan Message = make(chan Message, 100) func onMessageReceived(client MQTT.Client, message MQTT.Message) { // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) m := Message { topic: message.Topic(), payload: message.Payload(), } select { case messageInputChannel <- m: {} //log.Println("Message sent to channel") default: log.Println("Channel full, message lost") } } func onConnectionLost(client MQTT.Client, error error) { log.Printf("Connection lost, error %s", error) } func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { log.Println("Oops, connection lost, already reconnecting ...") } func onConnect(client MQTT.Client) { topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") if topicsStr == "" { log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") } topics := strings.Split(topicsStr, ",") for _, topic := range topics { if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) } log.Printf("Successfully subscribed to topic %s", topic) } } func inputDispatcher() { for { select { case message := <- messageInputChannel: log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) } } } func outputDispatcher(client MQTT.Client) { for { select { case message := <- messageOutputChannel: log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) if token := client.Publish(message.topic, 0, false, message.payload); token.Wait() && token.Error() != nil { log.Printf("Unable to publish, error %s", token.Error()) } log.Println("Successfully published") } } } func startMqttClient() MQTT.Client { broker := os.Getenv("MQTT_BROKER") if broker == "" { log.Fatal("No broker given, set env var MQTT_BROKER") } prefix := "UDI" uuid := uuid.New() clientId := fmt.Sprintf("%s-%s", prefix, uuid) opts := MQTT.NewClientOptions(). AddBroker(broker). SetClientID(clientId). SetConnectionLostHandler(onConnectionLost). SetOnConnectHandler(onConnect). SetReconnectingHandler(onReconnecting). SetConnectRetry(true) username := os.Getenv("MQTT_USERNAME") if username != "" { opts.SetUsername(username) } password := os.Getenv("MQTT_PASSWORD") if password != "" { opts.SetPassword(password) } enableTls := os.Getenv("MQTT_ENABLE_TLS") if enableTls == "true" { log.Println("Enableing TLS connection") tlsConfig := &tls.Config { InsecureSkipVerify: true, } opts.SetTLSConfig(tlsConfig) } log.Println("Trying to connect to broker") client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) } log.Printf("Successfully connected to broker %s", broker) return client } func stopMqttClient(client MQTT.Client) { log.Println("Disconnecting from broker") client.Disconnect(250) } func main() { log.SetPrefix("UDI: ") log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.Println("UDI starting") go inputDispatcher() mqttClient := startMqttClient() defer stopMqttClient(mqttClient) go outputDispatcher(mqttClient) log.Println("UDI running") c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) <-c log.Println("Terminating UDI") }