package main import "log" import "fmt" import "os" import "os/signal" import "strings" import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" func onMessageReceived(client MQTT.Client, message MQTT.Message) { log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) } func onConnectionLost(client MQTT.Client, error error) { log.Printf("Connection lost, error %s", error) } func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { log.Println("Oops, connection lost, already reconnecting ...") } func onConnect(client MQTT.Client) { topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") if topicsStr == "" { log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") } topics := strings.Split(topicsStr, ",") for _, topic := range topics { if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) } log.Printf("Successfully subscribed to topic %s", topic) } } func main() { log.SetPrefix("UDI: ") log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.Println("UDI starting") broker := os.Getenv("MQTT_BROKER") if broker == "" { log.Fatal("No broker given, set env var MQTT_BROKER") } prefix := "UDI" uuid := uuid.New() clientId := fmt.Sprintf("%s-%s", prefix, uuid) mqttOpts := MQTT.NewClientOptions(). AddBroker(broker). SetClientID(clientId). SetConnectionLostHandler(onConnectionLost). SetOnConnectHandler(onConnect). SetReconnectingHandler(onReconnecting). SetConnectRetry(true) client := MQTT.NewClient(mqttOpts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) } log.Printf("Successfully connected to broker %s", broker) log.Println("UDI running") c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) <-c log.Println("Terminating UDI") client.Disconnect(250) }