package mqtt import "log" import "strings" import "fmt" import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" import "crypto/tls" import "udi/config" import "udi/counter" type Message struct { Topic string Payload []byte } var InputChannel chan Message = make(chan Message, 100) var OutputChannel chan Message = make(chan Message, 100) var mqttClient MQTT.Client func onMessageReceived(client MQTT.Client, message MQTT.Message) { // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) for _, i := range config.Config.ExcludeTopics { if i == message.Topic { counter.S("Skipped") return } } m := Message { Topic: message.Topic(), Payload: message.Payload(), } select { case InputChannel <- m: counter.S("Received") {} //log.Println("Message sent to channel") default: log.Println("Channel full, message lost") counter.F("Received") } } func onConnectionLost(client MQTT.Client, error error) { log.Printf("Connection lost, error %s", error) } func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { log.Println("Oops, connection lost, already reconnecting ...") } func onConnect(client MQTT.Client) { for _, topic := range config.Config.IncludeTopics { if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) } log.Printf("Topic %s subscribed", topic) } } func outputDispatcher(client MQTT.Client) { for { select { case message := <- OutputChannel: log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { log.Printf("Unable to publish, error %s", token.Error()) } log.Println("Successfully published") } } } func StartMqttClient() { broker := config.Config.Mqtt.Broker if broker == "" { log.Fatal("No broker given, set env var MQTT_BROKER") } prefix := "MA" uuid := uuid.New() clientId := fmt.Sprintf("%s-%s", prefix, uuid) opts := MQTT.NewClientOptions(). AddBroker(broker). SetClientID(clientId). SetConnectionLostHandler(onConnectionLost). SetOnConnectHandler(onConnect). SetReconnectingHandler(onReconnecting). SetConnectRetry(true) username := config.Config.Mqtt.Username if username != "" { opts.SetUsername(username) } password := config.Config.Mqtt.Password if password != "" { opts.SetPassword(password) } enableTls := config.Config.Mqtt.TlsEnable if enableTls == "true" { //log.Println("Enabling TLS connection") tlsConfig := &tls.Config { InsecureSkipVerify: true, } opts.SetTLSConfig(tlsConfig) } log.Println("Broker connecting") mqttClient = MQTT.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) } //log.Printf("Successfully connected to broker %s", broker) go outputDispatcher(mqttClient) return } func StopMqttClient() { log.Println("Disconnecting from broker") mqttClient.Disconnect(250) } func TopicMatchesSubscription(topic, subscription string) bool { topicSegments := strings.Split(topic, "/") subscriptionSegments := strings.Split(subscription, "/") for i, subSegment := range subscriptionSegments { if subSegment == "+" { continue } else if subSegment == "#" { return true } else if i < len(topicSegments) && subSegment != topicSegments[i] { return false } } return true }