package mqtt import "log" import "fmt" import MQTT "github.com/eclipse/paho.mqtt.golang" import "github.com/google/uuid" import "crypto/tls" import "smq/config" // import "smq/counter" type Message struct { Topic string Payload []byte } var OutputChannel chan Message = make(chan Message, 100) var mqttClient MQTT.Client func onConnectionLost(client MQTT.Client, error error) { log.Printf("Connection lost, error %s", error) } func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { log.Println("Oops, connection lost, already reconnecting ...") } func onConnect(client MQTT.Client) { log.Println("Connected") } func outputDispatcher(client MQTT.Client) { for { select { case message := <- OutputChannel: log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { log.Printf("Unable to publish, error %s", token.Error()) } log.Println("Successfully published") } } } func Publish(msg []byte) { message := Message { Topic: config.Config.Mqtt.Topic, Payload: msg, } select { case OutputChannel <- message: {} default: log.Printf("Channel full, message %s lost") } } func Start() { broker := config.Config.Mqtt.Broker if broker == "" { log.Fatal("No broker given") } prefix := "SMQ" uuid := uuid.New() clientId := fmt.Sprintf("%s-%s", prefix, uuid) opts := MQTT.NewClientOptions(). AddBroker(broker). SetClientID(clientId). SetConnectionLostHandler(onConnectionLost). SetOnConnectHandler(onConnect). SetReconnectingHandler(onReconnecting). SetConnectRetry(true) username := config.Config.Mqtt.Username if username != "" { opts.SetUsername(username) } password := config.Config.Mqtt.Password if password != "" { opts.SetPassword(password) } enableTls := config.Config.Mqtt.TlsEnable if enableTls == "true" { //log.Println("Enabling TLS connection") tlsConfig := &tls.Config { InsecureSkipVerify: true, } opts.SetTLSConfig(tlsConfig) } log.Println("Broker connecting") mqttClient = MQTT.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) } //log.Printf("Successfully connected to broker %s", broker) go outputDispatcher(mqttClient) return } func Stop() { log.Println("Disconnecting from broker") mqttClient.Disconnect(250) }