diff --git a/src/go.mod b/src/go.mod deleted file mode 100644 index cb7bd79..0000000 --- a/src/go.mod +++ /dev/null @@ -1,14 +0,0 @@ -module hottis.de/universal-data-ingest - -go 1.21.3 - -require ( - github.com/eclipse/paho.mqtt.golang v1.4.3 - github.com/google/uuid v1.4.0 -) - -require ( - github.com/gorilla/websocket v1.5.0 // indirect - golang.org/x/net v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect -) diff --git a/src/go.sum b/src/go.sum deleted file mode 100644 index d4629d8..0000000 --- a/src/go.sum +++ /dev/null @@ -1,10 +0,0 @@ -github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= -github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/src/main.go b/src/main.go deleted file mode 100644 index ac7e495..0000000 --- a/src/main.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import "log" -import "fmt" -import "os" -import "os/signal" -import "strings" -// import "time" -import MQTT "github.com/eclipse/paho.mqtt.golang" -import "github.com/google/uuid" -import "crypto/tls" - - -type Message struct { - topic string - payload []byte -} - -var messageInputChannel chan Message = make(chan Message, 100) -var messageOutputChannel chan Message = make(chan Message, 100) - - -func onMessageReceived(client MQTT.Client, message MQTT.Message) { - // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) - m := Message { - topic: message.Topic(), - payload: message.Payload(), - } - select { - case messageInputChannel <- m: - {} - //log.Println("Message sent to channel") - default: - log.Println("Channel full, message lost") - } -} - -func onConnectionLost(client MQTT.Client, error error) { - log.Printf("Connection lost, error %s", error) -} - -func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { - log.Println("Oops, connection lost, already reconnecting ...") -} - - -func onConnect(client MQTT.Client) { - topicsStr := os.Getenv("MQTT_SUBSCRIBE_TOPICS") - if topicsStr == "" { - log.Fatal("No topics given, set env var MQTT_SUBSCRIBE_TOPICS") - } - topics := strings.Split(topicsStr, ",") - for _, topic := range topics { - if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) - } - log.Printf("Successfully subscribed to topic %s", topic) - } -} - -func inputDispatcher() { - for { - select { - case message := <- messageInputChannel: - log.Printf("Message arrived in inputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) - } - } -} - -func outputDispatcher(client MQTT.Client) { - for { - select { - case message := <- messageOutputChannel: - log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.topic, message.payload) - if token := client.Publish(message.topic, 0, false, message.payload); token.Wait() && token.Error() != nil { - log.Printf("Unable to publish, error %s", token.Error()) - } - log.Println("Successfully published") - } - } -} - -func startMqttClient() MQTT.Client { - broker := os.Getenv("MQTT_BROKER") - if broker == "" { - log.Fatal("No broker given, set env var MQTT_BROKER") - } - - prefix := "UDI" - uuid := uuid.New() - clientId := fmt.Sprintf("%s-%s", prefix, uuid) - - opts := MQTT.NewClientOptions(). - AddBroker(broker). - SetClientID(clientId). - SetConnectionLostHandler(onConnectionLost). - SetOnConnectHandler(onConnect). - SetReconnectingHandler(onReconnecting). - SetConnectRetry(true) - - username := os.Getenv("MQTT_USERNAME") - if username != "" { - opts.SetUsername(username) - } - - password := os.Getenv("MQTT_PASSWORD") - if password != "" { - opts.SetPassword(password) - } - - enableTls := os.Getenv("MQTT_ENABLE_TLS") - if enableTls == "true" { - log.Println("Enableing TLS connection") - tlsConfig := &tls.Config { - InsecureSkipVerify: true, - } - opts.SetTLSConfig(tlsConfig) - } - - log.Println("Trying to connect to broker") - client := MQTT.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) - } - log.Printf("Successfully connected to broker %s", broker) - - return client -} - -func stopMqttClient(client MQTT.Client) { - log.Println("Disconnecting from broker") - client.Disconnect(250) -} - -func main() { - log.SetPrefix("UDI: ") - log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) - - log.Println("UDI starting") - - go inputDispatcher() - - mqttClient := startMqttClient() - defer stopMqttClient(mqttClient) - - go outputDispatcher(mqttClient) - - log.Println("UDI running") - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - <-c - - log.Println("Terminating UDI") -} -