From bf0452fa6d1d3e948fa26bfef76ffede2c18e5ca Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 25 Nov 2023 12:21:36 +0100 Subject: [PATCH] initial --- go.mod | 14 +++++++++++ go.sum | 10 ++++++++ main.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cb7bd79 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module hottis.de/universal-data-ingest + +go 1.21.3 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/google/uuid v1.4.0 +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d4629d8 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..05fdc41 --- /dev/null +++ b/main.go @@ -0,0 +1,74 @@ +package main + +import "log" +import "fmt" +import "os" +import "os/signal" +import "strings" +import MQTT "github.com/eclipse/paho.mqtt.golang" +import "github.com/google/uuid" + + + +func onMessageReceived(client MQTT.Client, message MQTT.Message) { + log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) +} + +func onConnectionLost(client MQTT.Client, error error) { + log.Printf("Connection lost, error %s", error) +} + +func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { + log.Println("Oops, connection lost, already reconnecting ...") +} + + +func onConnect(client MQTT.Client) { + topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") + topics := strings.Split(topicsStr, ",") + for _, topic := range topics { + if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) + } + log.Printf("Successfully subscribed to topic %s", topic) + } +} + + +func main() { + log.SetPrefix("UDI: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("UDI starting") + + broker := os.Getenv("MQTT_BROKER") + + prefix := "UDI" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) + + mqttOpts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) + + client := MQTT.NewClient(mqttOpts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + log.Printf("Successfully connected to broker %s", broker) + + + log.Println("UDI running") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + <-c + + log.Println("Terminating UDI") + client.Disconnect(250) +} +