From 6d4b7541efe1c20d62139b68174efe54b0ee7a10 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 10 Feb 2025 12:48:42 +0100 Subject: [PATCH] fix, 2 --- .gitignore | 1 - src/tsm/config-test.json | 14 +++++ src/tsm/config/config.go | 38 +++++++++++++ src/tsm/go.mod | 16 ++++++ src/tsm/go.sum | 22 ++++++++ src/tsm/mqtt/mqtt.go | 113 +++++++++++++++++++++++++++++++++++++++ src/tsm/tsm.go | 33 ++++++++++++ src/tsm/tsmq/tsmq.go | 72 +++++++++++++++++++++++++ 8 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 src/tsm/config-test.json create mode 100644 src/tsm/config/config.go create mode 100644 src/tsm/go.mod create mode 100644 src/tsm/go.sum create mode 100644 src/tsm/mqtt/mqtt.go create mode 100644 src/tsm/tsm.go create mode 100644 src/tsm/tsmq/tsmq.go diff --git a/.gitignore b/.gitignore index dad78d1..0c81346 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ build .DS_Store -tsm src/tsm/tsm diff --git a/src/tsm/config-test.json b/src/tsm/config-test.json new file mode 100644 index 0000000..dec16fa --- /dev/null +++ b/src/tsm/config-test.json @@ -0,0 +1,14 @@ +{ + "mqtt": { + "broker": "mqtt://172.23.1.102:1883", + "tlsEnable": "false", + "topic": "tsm" + }, + "interval": 10, + "servers": [ + { + "name": "172.16.13.10", + "label": "david" + } + ] +} diff --git a/src/tsm/config/config.go b/src/tsm/config/config.go new file mode 100644 index 0000000..c0d8304 --- /dev/null +++ b/src/tsm/config/config.go @@ -0,0 +1,38 @@ +package config + +import ( + "encoding/json" + "os" + "log" +) + +type Server struct { + Name string `json:"name"` + Label string `json:"label"` +} + +type MQTTConfigObject struct { + Broker string `json:"broker"` + Username string `json:"username"` + Password string `json:"password"` + TlsEnable string `json:"tlsEnable"` + Topic string `json:"topic"` +} + +type ConfigObject struct { + Mqtt MQTTConfigObject `json:"mqtt"` + Interval int `json:"interval"` + Servers []Server `json:"servers"` +} + +var Config ConfigObject + +func LoadConfiguration() { + cfg := os.Getenv("TSM_MQTT_CONF") + log.Printf("cfg: %s", cfg) + err := json.Unmarshal([]byte(cfg), &Config) + if err != nil { + log.Fatalf("Unable to parse configuration: %v", err) + } +} + diff --git a/src/tsm/go.mod b/src/tsm/go.mod new file mode 100644 index 0000000..8f65c79 --- /dev/null +++ b/src/tsm/go.mod @@ -0,0 +1,16 @@ +module tsm + +go 1.21.3 + +require ( + github.com/beevik/ntp v1.4.3 + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/google/uuid v1.6.0 +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.20.0 // indirect +) diff --git a/src/tsm/go.sum b/src/tsm/go.sum new file mode 100644 index 0000000..7e4707c --- /dev/null +++ b/src/tsm/go.sum @@ -0,0 +1,22 @@ +github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= +github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/tsm/mqtt/mqtt.go b/src/tsm/mqtt/mqtt.go new file mode 100644 index 0000000..63b6d21 --- /dev/null +++ b/src/tsm/mqtt/mqtt.go @@ -0,0 +1,113 @@ +package mqtt + +import "log" +import "fmt" +import MQTT "github.com/eclipse/paho.mqtt.golang" +import "github.com/google/uuid" +import "crypto/tls" +import "tsm/config" +// import "smq/counter" + +type Message struct { + Topic string + Payload []byte +} + +var OutputChannel chan Message = make(chan Message, 100) + +var mqttClient MQTT.Client + +func onConnectionLost(client MQTT.Client, error error) { + log.Printf("Connection lost, error %s", error) +} + +func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { + log.Println("Oops, connection lost, already reconnecting ...") +} + + +func onConnect(client MQTT.Client) { + log.Println("Connected") +} + +func outputDispatcher(client MQTT.Client) { + for { + select { + case message := <- OutputChannel: + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) + if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") + } + } +} + +func Publish(msg []byte) { + message := Message { + Topic: config.Config.Mqtt.Topic, + Payload: msg, + } + select { + case OutputChannel <- message: + {} + default: + log.Printf("Channel full, message %s lost") + } +} + + +func Start() { + broker := config.Config.Mqtt.Broker + if broker == "" { + log.Fatal("No broker given") + } + + prefix := "SMQ" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) + + opts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) + + username := config.Config.Mqtt.Username + if username != "" { + opts.SetUsername(username) + } + + password := config.Config.Mqtt.Password + if password != "" { + opts.SetPassword(password) + } + + enableTls := config.Config.Mqtt.TlsEnable + if enableTls == "true" { + //log.Println("Enabling TLS connection") + tlsConfig := &tls.Config { + InsecureSkipVerify: true, + } + opts.SetTLSConfig(tlsConfig) + } + + log.Println("Broker connecting") + mqttClient = MQTT.NewClient(opts) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + //log.Printf("Successfully connected to broker %s", broker) + + go outputDispatcher(mqttClient) + + return +} + +func Stop() { + log.Println("Disconnecting from broker") + mqttClient.Disconnect(250) +} + diff --git a/src/tsm/tsm.go b/src/tsm/tsm.go new file mode 100644 index 0000000..17ef26d --- /dev/null +++ b/src/tsm/tsm.go @@ -0,0 +1,33 @@ +package main + +import ( + "log" + "os" + "os/signal" + + "tsm/config" + "tsm/mqtt" + "tsm/tsmq" +) + + +func main() { + log.SetPrefix("TSM: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("starting") + + config.LoadConfiguration() + + mqtt.Start() + defer mqtt.Stop() + + tsmq.Start() + defer tsmq.Stop() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + <- c + + log.Println("terminating") +} diff --git a/src/tsm/tsmq/tsmq.go b/src/tsm/tsmq/tsmq.go new file mode 100644 index 0000000..abc6a81 --- /dev/null +++ b/src/tsm/tsmq/tsmq.go @@ -0,0 +1,72 @@ +package tsmq + +import ( + "log" + "time" + "encoding/json" + + "tsm/config" + "tsm/mqtt" + + "github.com/beevik/ntp" +) + +type variable_t struct { + Name string `json:"variable"` + Value float64 `json:"value"` + Unit string `json:"unit"` + Status string `json:"status"` +} + +type message_t struct { + Server string `json:"server"` + Label string `json:"label"` + Variables map[string]variable_t `json:"variables"` +} + + + +func Start() { + for { + for _, server := range config.Config.Servers { + log.Println("Polling server " + server.Name) + + message := message_t { + Server: server.Name, + Label: server.Label, + Variables: make(map[string]variable_t), + } + + label := "rootdisp" + status := "Ok" + value := 0.0 + resp, err := ntp.Query(server.Name) + if err != nil { + status = "Error" + } else { + value = resp.RootDispersion.Seconds() * 1000 + } + + v := variable_t { + Name: label, + Value: value, + Unit: "ms", + Status: status, + } + message.Variables[label] = v + + + j, err := json.Marshal(message) + if err != nil { + log.Printf("Unable to marshal message, it is lost: %s, %v", message, err) + } else { + mqtt.Publish(j) + } + } + + time.Sleep(time.Duration(config.Config.Interval) * time.Second) + } +} + +func Stop() { +}