diff --git a/.gitignore b/.gitignore index 8cd93c3..e69de29 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +0,0 @@ -udi diff --git a/src/udi/go.mod b/src/udi/go.mod new file mode 100644 index 0000000..a1ca57d --- /dev/null +++ b/src/udi/go.mod @@ -0,0 +1,14 @@ +module udi + +go 1.21.3 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/google/uuid v1.4.0 +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect +) diff --git a/src/udi/go.sum b/src/udi/go.sum new file mode 100644 index 0000000..d4629d8 --- /dev/null +++ b/src/udi/go.sum @@ -0,0 +1,10 @@ +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/src/udi/main.go b/src/udi/main.go new file mode 100644 index 0000000..6636bb1 --- /dev/null +++ b/src/udi/main.go @@ -0,0 +1,157 @@ +package main + +import "log" +import "fmt" +import "os" +import "os/signal" +import "strings" +// import "time" +import MQTT "github.com/eclipse/paho.mqtt.golang" +import "github.com/google/uuid" +import "crypto/tls" +import um "udi/mqtt" + +type Message struct { + topic string + payload []byte +} + +var messageInputChannel chan Message = make(chan Message, 100) +var messageOutputChannel chan Message = make(chan Message, 100) + + +func onMessageReceived(client MQTT.Client, message MQTT.Message) { + // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) + m := Message { + topic: message.Topic(), + payload: message.Payload(), + } + select { + case messageInputChannel <- m: + {} + //log.Println("Message sent to channel") + default: + log.Println("Channel full, message lost") + } +} + +func onConnectionLost(client MQTT.Client, error error) { + log.Printf("Connection lost, error %s", error) +} + +func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { + log.Println("Oops, connection lost, already reconnecting ...") +} + + +func onConnect(client MQTT.Client) { + topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") + if topicsStr == "" { + log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") + } + topics := strings.Split(topicsStr, ",") + for _, topic := range topics { + if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) + } + log.Printf("Successfully subscribed to topic %s", topic) + } +} + +func inputDispatcher() { + for { + select { + case message := <- messageInputChannel: + log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + } + } +} + +func outputDispatcher(client MQTT.Client) { + for { + select { + case message := <- messageOutputChannel: + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) + if token := client.Publish(message.topic, 0, false, message.payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") + } + } +} + +func startMqttClient() MQTT.Client { + broker := os.Getenv("MQTT_BROKER") + if broker == "" { + log.Fatal("No broker given, set env var MQTT_BROKER") + } + + prefix := "UDI" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) + + opts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) + + username := os.Getenv("MQTT_USERNAME") + if username != "" { + opts.SetUsername(username) + } + + password := os.Getenv("MQTT_PASSWORD") + if password != "" { + opts.SetPassword(password) + } + + enableTls := os.Getenv("MQTT_ENABLE_TLS") + if enableTls == "true" { + log.Println("Enableing TLS connection") + tlsConfig := &tls.Config { + InsecureSkipVerify: true, + } + opts.SetTLSConfig(tlsConfig) + } + + log.Println("Trying to connect to broker") + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + log.Printf("Successfully connected to broker %s", broker) + + return client +} + +func stopMqttClient(client MQTT.Client) { + log.Println("Disconnecting from broker") + client.Disconnect(250) +} + +func main() { + log.SetPrefix("UDI: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("UDI starting") + um.TestMqtt() + + go inputDispatcher() + + mqttClient := startMqttClient() + defer stopMqttClient(mqttClient) + + go outputDispatcher(mqttClient) + + log.Println("UDI running") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + <-c + + log.Println("Terminating UDI") +} + diff --git a/src/udi/mqtt/mqtt.go b/src/udi/mqtt/mqtt.go new file mode 100644 index 0000000..4cefc69 --- /dev/null +++ b/src/udi/mqtt/mqtt.go @@ -0,0 +1,9 @@ +package mqtt + +import "log" + + +func TestMqtt() { + log.Println("test mqtt") +} + diff --git a/src/udi/udi b/src/udi/udi new file mode 100755 index 0000000..ec9b370 Binary files /dev/null and b/src/udi/udi differ