This commit is contained in:
parent
1f13f354e1
commit
6d4b7541ef
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,4 +1,3 @@
|
|||||||
build
|
build
|
||||||
.DS_Store
|
.DS_Store
|
||||||
tsm
|
|
||||||
src/tsm/tsm
|
src/tsm/tsm
|
||||||
|
14
src/tsm/config-test.json
Normal file
14
src/tsm/config-test.json
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
{
|
||||||
|
"mqtt": {
|
||||||
|
"broker": "mqtt://172.23.1.102:1883",
|
||||||
|
"tlsEnable": "false",
|
||||||
|
"topic": "tsm"
|
||||||
|
},
|
||||||
|
"interval": 10,
|
||||||
|
"servers": [
|
||||||
|
{
|
||||||
|
"name": "172.16.13.10",
|
||||||
|
"label": "david"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
38
src/tsm/config/config.go
Normal file
38
src/tsm/config/config.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Server struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Label string `json:"label"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MQTTConfigObject struct {
|
||||||
|
Broker string `json:"broker"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
Password string `json:"password"`
|
||||||
|
TlsEnable string `json:"tlsEnable"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConfigObject struct {
|
||||||
|
Mqtt MQTTConfigObject `json:"mqtt"`
|
||||||
|
Interval int `json:"interval"`
|
||||||
|
Servers []Server `json:"servers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var Config ConfigObject
|
||||||
|
|
||||||
|
func LoadConfiguration() {
|
||||||
|
cfg := os.Getenv("TSM_MQTT_CONF")
|
||||||
|
log.Printf("cfg: %s", cfg)
|
||||||
|
err := json.Unmarshal([]byte(cfg), &Config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to parse configuration: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
16
src/tsm/go.mod
Normal file
16
src/tsm/go.mod
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
module tsm
|
||||||
|
|
||||||
|
go 1.21.3
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/beevik/ntp v1.4.3
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.4.3
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/gorilla/websocket v1.5.0 // indirect
|
||||||
|
golang.org/x/net v0.25.0 // indirect
|
||||||
|
golang.org/x/sync v0.1.0 // indirect
|
||||||
|
golang.org/x/sys v0.20.0 // indirect
|
||||||
|
)
|
22
src/tsm/go.sum
Normal file
22
src/tsm/go.sum
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho=
|
||||||
|
github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
|
||||||
|
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||||
|
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
|
||||||
|
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||||
|
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||||
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||||
|
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
113
src/tsm/mqtt/mqtt.go
Normal file
113
src/tsm/mqtt/mqtt.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
package mqtt
|
||||||
|
|
||||||
|
import "log"
|
||||||
|
import "fmt"
|
||||||
|
import MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
import "github.com/google/uuid"
|
||||||
|
import "crypto/tls"
|
||||||
|
import "tsm/config"
|
||||||
|
// import "smq/counter"
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Topic string
|
||||||
|
Payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
var OutputChannel chan Message = make(chan Message, 100)
|
||||||
|
|
||||||
|
var mqttClient MQTT.Client
|
||||||
|
|
||||||
|
func onConnectionLost(client MQTT.Client, error error) {
|
||||||
|
log.Printf("Connection lost, error %s", error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) {
|
||||||
|
log.Println("Oops, connection lost, already reconnecting ...")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func onConnect(client MQTT.Client) {
|
||||||
|
log.Println("Connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
func outputDispatcher(client MQTT.Client) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case message := <- OutputChannel:
|
||||||
|
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
|
||||||
|
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
|
||||||
|
log.Printf("Unable to publish, error %s", token.Error())
|
||||||
|
}
|
||||||
|
log.Println("Successfully published")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Publish(msg []byte) {
|
||||||
|
message := Message {
|
||||||
|
Topic: config.Config.Mqtt.Topic,
|
||||||
|
Payload: msg,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case OutputChannel <- message:
|
||||||
|
{}
|
||||||
|
default:
|
||||||
|
log.Printf("Channel full, message %s lost")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func Start() {
|
||||||
|
broker := config.Config.Mqtt.Broker
|
||||||
|
if broker == "" {
|
||||||
|
log.Fatal("No broker given")
|
||||||
|
}
|
||||||
|
|
||||||
|
prefix := "SMQ"
|
||||||
|
uuid := uuid.New()
|
||||||
|
clientId := fmt.Sprintf("%s-%s", prefix, uuid)
|
||||||
|
|
||||||
|
opts := MQTT.NewClientOptions().
|
||||||
|
AddBroker(broker).
|
||||||
|
SetClientID(clientId).
|
||||||
|
SetConnectionLostHandler(onConnectionLost).
|
||||||
|
SetOnConnectHandler(onConnect).
|
||||||
|
SetReconnectingHandler(onReconnecting).
|
||||||
|
SetConnectRetry(true)
|
||||||
|
|
||||||
|
username := config.Config.Mqtt.Username
|
||||||
|
if username != "" {
|
||||||
|
opts.SetUsername(username)
|
||||||
|
}
|
||||||
|
|
||||||
|
password := config.Config.Mqtt.Password
|
||||||
|
if password != "" {
|
||||||
|
opts.SetPassword(password)
|
||||||
|
}
|
||||||
|
|
||||||
|
enableTls := config.Config.Mqtt.TlsEnable
|
||||||
|
if enableTls == "true" {
|
||||||
|
//log.Println("Enabling TLS connection")
|
||||||
|
tlsConfig := &tls.Config {
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
}
|
||||||
|
opts.SetTLSConfig(tlsConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Broker connecting")
|
||||||
|
mqttClient = MQTT.NewClient(opts)
|
||||||
|
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||||
|
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
|
||||||
|
}
|
||||||
|
//log.Printf("Successfully connected to broker %s", broker)
|
||||||
|
|
||||||
|
go outputDispatcher(mqttClient)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func Stop() {
|
||||||
|
log.Println("Disconnecting from broker")
|
||||||
|
mqttClient.Disconnect(250)
|
||||||
|
}
|
||||||
|
|
33
src/tsm/tsm.go
Normal file
33
src/tsm/tsm.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
|
||||||
|
"tsm/config"
|
||||||
|
"tsm/mqtt"
|
||||||
|
"tsm/tsmq"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
log.SetPrefix("TSM: ")
|
||||||
|
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||||||
|
|
||||||
|
log.Println("starting")
|
||||||
|
|
||||||
|
config.LoadConfiguration()
|
||||||
|
|
||||||
|
mqtt.Start()
|
||||||
|
defer mqtt.Stop()
|
||||||
|
|
||||||
|
tsmq.Start()
|
||||||
|
defer tsmq.Stop()
|
||||||
|
|
||||||
|
c := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(c, os.Interrupt, os.Kill)
|
||||||
|
<- c
|
||||||
|
|
||||||
|
log.Println("terminating")
|
||||||
|
}
|
72
src/tsm/tsmq/tsmq.go
Normal file
72
src/tsm/tsmq/tsmq.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package tsmq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"tsm/config"
|
||||||
|
"tsm/mqtt"
|
||||||
|
|
||||||
|
"github.com/beevik/ntp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type variable_t struct {
|
||||||
|
Name string `json:"variable"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type message_t struct {
|
||||||
|
Server string `json:"server"`
|
||||||
|
Label string `json:"label"`
|
||||||
|
Variables map[string]variable_t `json:"variables"`
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
func Start() {
|
||||||
|
for {
|
||||||
|
for _, server := range config.Config.Servers {
|
||||||
|
log.Println("Polling server " + server.Name)
|
||||||
|
|
||||||
|
message := message_t {
|
||||||
|
Server: server.Name,
|
||||||
|
Label: server.Label,
|
||||||
|
Variables: make(map[string]variable_t),
|
||||||
|
}
|
||||||
|
|
||||||
|
label := "rootdisp"
|
||||||
|
status := "Ok"
|
||||||
|
value := 0.0
|
||||||
|
resp, err := ntp.Query(server.Name)
|
||||||
|
if err != nil {
|
||||||
|
status = "Error"
|
||||||
|
} else {
|
||||||
|
value = resp.RootDispersion.Seconds() * 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
v := variable_t {
|
||||||
|
Name: label,
|
||||||
|
Value: value,
|
||||||
|
Unit: "ms",
|
||||||
|
Status: status,
|
||||||
|
}
|
||||||
|
message.Variables[label] = v
|
||||||
|
|
||||||
|
|
||||||
|
j, err := json.Marshal(message)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Unable to marshal message, it is lost: %s, %v", message, err)
|
||||||
|
} else {
|
||||||
|
mqtt.Publish(j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Duration(config.Config.Interval) * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Stop() {
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user