my own modifications, part 1

This commit is contained in:
2024-01-25 12:35:27 +01:00
parent b68e929c57
commit 5b77a6a059
20 changed files with 402 additions and 543 deletions

37
src/smq/config-test.json Normal file
View File

@ -0,0 +1,37 @@
{
"mqtt": {
"broker": "mqtt://172.23.1.102:1883",
"tlsEnable": "false",
"topic": "snmp"
},
"interval": 10,
"snmpEndpoints": [
{
"endpoint": "172.16.3.1",
"label": "router",
"community": "public",
"oidTopics": [
{
"oid": ".1.3.6.1.2.1.31.1.1.1.6.4",
"label": "wan-in",
"diff": "true"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.10.4",
"label": "wan-out",
"diff": "true"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.6.2",
"label": "lan-in",
"diff": "true"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.10.2",
"label": "lan-out",
"diff": "true"
}
]
}
]
}

33
src/smq/config.json Normal file
View File

@ -0,0 +1,33 @@
{
"mqtt": {
"broker": "mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883",
"tlsEnable": "false",
"topicPre": "snmp"
},
"interval": 60,
"snmpEndpoints": [
{
"endpoint": "172.16.3.1",
"label": "router",
"community": "public",
"oidTopics": [
{
"oid": ".1.3.6.1.2.1.31.1.1.1.6.4",
"label": "wan-in"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.10.4",
"label": "wan-out"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.6.2",
"label": "lan-in"
},
{
"oid": ".1.3.6.1.2.1.31.1.1.1.10.2",
"label": "lan-out"
}
]
}
]
}

47
src/smq/config/config.go Normal file
View File

@ -0,0 +1,47 @@
package config
import (
"encoding/json"
"os"
"log"
)
type OIDTopicObject struct {
OID string `json:"oid"`
Label string `json:"label"`
Diff string `json:"diff"`
}
// SNMPEndpointObject is the SNMP Endpoint definition
type SNMPEndpointObject struct {
Endpoint string `json:"endpoint"`
Label string `json:"label"`
Community string `json:"community"`
OIDTopics []OIDTopicObject `json:"oidTopics"`
}
type MQTTConfigObject struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
TlsEnable string `json:"tlsEnable"`
Topic string `json:"topic"`
}
type ConfigObject struct {
Mqtt MQTTConfigObject `json:"mqtt"`
Interval int `json:"interval"`
SNMPEndpoints []SNMPEndpointObject `json:"snmpEndpoints"`
}
var Config ConfigObject
func LoadConfiguration() {
cfg := os.Getenv("SNMP_MQTT_CONF")
log.Printf("cfg: %s", cfg)
err := json.Unmarshal([]byte(cfg), &Config)
if err != nil {
log.Fatalf("Unable to parse configuration: %v", err)
}
}

15
src/smq/go.mod Normal file
View File

@ -0,0 +1,15 @@
module smq
go 1.21.3
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/google/uuid v1.6.0
github.com/gosnmp/gosnmp v1.37.0
)
require (
github.com/gorilla/websocket v1.5.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sync v0.1.0 // indirect
)

20
src/smq/go.sum Normal file
View File

@ -0,0 +1,20 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y=
github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

113
src/smq/mqtt/mqtt.go Normal file
View File

@ -0,0 +1,113 @@
package mqtt
import "log"
import "fmt"
import MQTT "github.com/eclipse/paho.mqtt.golang"
import "github.com/google/uuid"
import "crypto/tls"
import "smq/config"
// import "smq/counter"
type Message struct {
Topic string
Payload []byte
}
var OutputChannel chan Message = make(chan Message, 100)
var mqttClient MQTT.Client
func onConnectionLost(client MQTT.Client, error error) {
log.Printf("Connection lost, error %s", error)
}
func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) {
log.Println("Oops, connection lost, already reconnecting ...")
}
func onConnect(client MQTT.Client) {
log.Println("Connected")
}
func outputDispatcher(client MQTT.Client) {
for {
select {
case message := <- OutputChannel:
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
log.Printf("Unable to publish, error %s", token.Error())
}
log.Println("Successfully published")
}
}
}
func Publish(msg []byte) {
message := Message {
Topic: config.Config.Mqtt.Topic,
Payload: msg,
}
select {
case OutputChannel <- message:
{}
default:
log.Printf("Channel full, message %s lost")
}
}
func Start() {
broker := config.Config.Mqtt.Broker
if broker == "" {
log.Fatal("No broker given")
}
prefix := "SMQ"
uuid := uuid.New()
clientId := fmt.Sprintf("%s-%s", prefix, uuid)
opts := MQTT.NewClientOptions().
AddBroker(broker).
SetClientID(clientId).
SetConnectionLostHandler(onConnectionLost).
SetOnConnectHandler(onConnect).
SetReconnectingHandler(onReconnecting).
SetConnectRetry(true)
username := config.Config.Mqtt.Username
if username != "" {
opts.SetUsername(username)
}
password := config.Config.Mqtt.Password
if password != "" {
opts.SetPassword(password)
}
enableTls := config.Config.Mqtt.TlsEnable
if enableTls == "true" {
//log.Println("Enabling TLS connection")
tlsConfig := &tls.Config {
InsecureSkipVerify: true,
}
opts.SetTLSConfig(tlsConfig)
}
log.Println("Broker connecting")
mqttClient = MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
}
//log.Printf("Successfully connected to broker %s", broker)
go outputDispatcher(mqttClient)
return
}
func Stop() {
log.Println("Disconnecting from broker")
mqttClient.Disconnect(250)
}

BIN
src/smq/smq Executable file

Binary file not shown.

33
src/smq/snmp-mqtt.go Normal file
View File

@ -0,0 +1,33 @@
package main
import (
"log"
"os"
"os/signal"
"smq/config"
"smq/mqtt"
"smq/snmp"
)
func main() {
log.SetPrefix("SMQ: ")
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
log.Println("starting")
config.LoadConfiguration()
mqtt.Start()
defer mqtt.Stop()
snmp.Start()
defer snmp.Stop()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
<- c
log.Println("terminating")
}

103
src/smq/snmp/snmp.go Normal file
View File

@ -0,0 +1,103 @@
package snmp
import (
"fmt"
"log"
"time"
"encoding/json"
"smq/config"
"smq/mqtt"
"github.com/gosnmp/gosnmp"
)
type variable_t struct {
Label string `json:"label"`
Variable string `json:"variable"`
Value string `json:"value"`
}
type message_t struct {
Device string `json:"device"`
Label string `json:"label"`
Variables map[string]variable_t `json:"variables"`
}
func Start() {
for {
for _, endpoint := range config.Config.SNMPEndpoints {
log.Println("Polling endpoint " + endpoint.Endpoint)
snmp := gosnmp.GoSNMP{}
snmp.Target = endpoint.Endpoint
snmp.Port = 161
snmp.Version = gosnmp.Version2c
snmp.Community = endpoint.Community
snmp.Timeout = time.Duration(5 * time.Second)
err := snmp.Connect()
if err != nil {
log.Fatal("SNMP Connect error\n")
}
oids := []string{}
for _, oidTopic := range endpoint.OIDTopics {
oids = append(oids, oidTopic.OID)
}
result, err := snmp.Get(oids)
if err != nil {
log.Printf("error in Get: %s", err)
} else {
message := message_t {
Device: endpoint.Endpoint,
Label: endpoint.Label,
Variables: make(map[string]variable_t),
}
for _, variable := range result.Variables {
for _, oidTopic := range endpoint.OIDTopics {
if oidTopic.OID == variable.Name {
convertedValue := ""
switch variable.Type {
case gosnmp.OctetString:
convertedValue = string(variable.Value.([]byte))
default:
convertedValue = fmt.Sprintf("%d", gosnmp.ToBigInt(variable.Value))
}
log.Printf("%s = %s", oidTopic.OID, convertedValue)
v := variable_t {
Label: oidTopic.Label,
Variable: oidTopic.OID,
Value: convertedValue,
}
message.Variables[oidTopic.Label] = v
// build topic and payload and push into the mqtt.OutputChannel
}
}
}
j, err := json.Marshal(message)
if err != nil {
log.Printf("Unable to marshal message, it is lost: %s, %v", message, err)
} else {
mqtt.Publish(j)
}
}
snmp.Conn.Close()
}
time.Sleep(time.Duration(config.Config.Interval) * time.Second)
}
}
func Stop() {
}