From cdf6a6c44adb584fb9f2d18535add3988622da3a Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 12 Jan 2026 17:43:06 +0100 Subject: [PATCH] syslog writing works --- Dockerfile | 1 + config-test.json | 10 +- deployment/config.json | 10 +- src/ma/archiver/archiver.go | 9 +- src/ma/archiver/syslog.go | 115 ++++++++++++++++++++ src/ma/config/config.go | 46 ++++---- src/ma/mqtt/mqtt.go | 205 ++++++++++++++++++------------------ 7 files changed, 269 insertions(+), 127 deletions(-) create mode 100644 src/ma/archiver/syslog.go diff --git a/Dockerfile b/Dockerfile index 7d15a54..0f24fb4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,7 @@ RUN go build -a -installsuffix nocgo -o ma main.go FROM scratch ENV MA_CONF "" +ENV MQTT_PASSWORD "" COPY --from=builder /go/src/ma ./ ENTRYPOINT ["./ma"] diff --git a/config-test.json b/config-test.json index d53a383..551886c 100644 --- a/config-test.json +++ b/config-test.json @@ -12,5 +12,13 @@ "snmp", "MainsCnt/#", "cem/#" - ] + ], + "syslog": { + "enable": "true", + "network": "udp", + "server": "172.20.0.10:514", + "facility": "local0", + "severity": "info", + "tag": "mqtt-archiver" + } } diff --git a/deployment/config.json b/deployment/config.json index 268fc42..1bf2b64 100644 --- a/deployment/config.json +++ b/deployment/config.json @@ -12,5 +12,13 @@ "snmp", "MainsCnt/#", "cem/#" - ] + ], + "syslog": { + "enable": "false", + "network": "udp", + "server": "syslog-server:514", + "facility": "local0", + "severity": "info", + "tag": "mqtt-archiver" + } } diff --git a/src/ma/archiver/archiver.go b/src/ma/archiver/archiver.go index e53d77b..6ef7f01 100644 --- a/src/ma/archiver/archiver.go +++ b/src/ma/archiver/archiver.go @@ -7,13 +7,14 @@ import ( ) type Message struct { - Time time.Time - Topic string - Payload string + Time time.Time `json:"time"` + Topic string `json:"topic"` + Payload string `json:"payload"` } func InitArchiver() { log.Printf("Archiver initializing") + InitSyslog() } func InputArchiver() { @@ -25,4 +26,6 @@ func InputArchiver() { func handleMessage(message Message) { log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload) + + WriteSyslog(message) } diff --git a/src/ma/archiver/syslog.go b/src/ma/archiver/syslog.go new file mode 100644 index 0000000..dec2ae1 --- /dev/null +++ b/src/ma/archiver/syslog.go @@ -0,0 +1,115 @@ +package archiver + +import ( + "encoding/json" + "log" + "log/syslog" + "ma/config" +) + +var syslogWriter *syslog.Writer + +func InitSyslog() { + if config.Config.Syslog.Enable == "true" { + // Parse facility + facility := parseFacility(config.Config.Syslog.Facility) + + // Parse severity + severity := parseSeverity(config.Config.Syslog.Severity) + + // Combine to priority + priority := facility | severity + + var err error + syslogWriter, err = syslog.Dial( + config.Config.Syslog.Network, + config.Config.Syslog.Server, + priority, + config.Config.Syslog.Tag, + ) + if err != nil { + log.Fatalf("Failed to connect to syslog server: %v", err) + } + log.Printf("Syslog connection established: %s://%s", config.Config.Syslog.Network, config.Config.Syslog.Server) + } +} + +func WriteSyslog(message Message) { + if syslogWriter != nil { + jsonData, err := json.Marshal(message) + if err != nil { + log.Printf("Failed to marshal message to JSON: %v", err) + return + } + + // Send to syslog based on configured severity + switch config.Config.Syslog.Severity { + case "emerg": + syslogWriter.Emerg(string(jsonData)) + case "alert": + syslogWriter.Alert(string(jsonData)) + case "crit": + syslogWriter.Crit(string(jsonData)) + case "err": + syslogWriter.Err(string(jsonData)) + case "warning": + syslogWriter.Warning(string(jsonData)) + case "notice": + syslogWriter.Notice(string(jsonData)) + case "info": + syslogWriter.Info(string(jsonData)) + case "debug": + syslogWriter.Debug(string(jsonData)) + default: + syslogWriter.Info(string(jsonData)) + } + } +} + +func parseFacility(facility string) syslog.Priority { + facilities := map[string]syslog.Priority{ + "kern": syslog.LOG_KERN, + "user": syslog.LOG_USER, + "mail": syslog.LOG_MAIL, + "daemon": syslog.LOG_DAEMON, + "auth": syslog.LOG_AUTH, + "syslog": syslog.LOG_SYSLOG, + "lpr": syslog.LOG_LPR, + "news": syslog.LOG_NEWS, + "uucp": syslog.LOG_UUCP, + "cron": syslog.LOG_CRON, + "authpriv": syslog.LOG_AUTHPRIV, + "ftp": syslog.LOG_FTP, + "local0": syslog.LOG_LOCAL0, + "local1": syslog.LOG_LOCAL1, + "local2": syslog.LOG_LOCAL2, + "local3": syslog.LOG_LOCAL3, + "local4": syslog.LOG_LOCAL4, + "local5": syslog.LOG_LOCAL5, + "local6": syslog.LOG_LOCAL6, + "local7": syslog.LOG_LOCAL7, + } + + if f, ok := facilities[facility]; ok { + return f + } + return syslog.LOG_LOCAL0 // Default +} + +func parseSeverity(severity string) syslog.Priority { + severities := map[string]syslog.Priority{ + "emerg": syslog.LOG_EMERG, + "alert": syslog.LOG_ALERT, + "crit": syslog.LOG_CRIT, + "err": syslog.LOG_ERR, + "warning": syslog.LOG_WARNING, + "notice": syslog.LOG_NOTICE, + "info": syslog.LOG_INFO, + "debug": syslog.LOG_DEBUG, + } + + if s, ok := severities[severity]; ok { + return s + } + return syslog.LOG_INFO // Default +} diff --git a/src/ma/config/config.go b/src/ma/config/config.go index 04718f1..fcc4f99 100644 --- a/src/ma/config/config.go +++ b/src/ma/config/config.go @@ -1,33 +1,41 @@ package config -import "encoding/json" -import "log" -import "os" +import ( + "encoding/json" + "log" + "os" +) type HandlerConfigT struct { - Attributes map[string]string `json:"attributes"` + Attributes map[string]string `json:"attributes"` } type ConfigT struct { - Mqtt struct { - Broker string `json:"broker"` - Username string `json:"username"` - Password string - TlsEnable string `json:"tlsEnable"` - } `json:"mqtt"` - IncludeTopics []string `json:"includeTopics"` - ExcludeTopics []string `json:"excludeTopics"` + Mqtt struct { + Broker string `json:"broker"` + Username string `json:"username"` + Password string + TlsEnable string `json:"tlsEnable"` + } `json:"mqtt"` + IncludeTopics []string `json:"includeTopics"` + ExcludeTopics []string `json:"excludeTopics"` + Syslog struct { + Enable string `json:"enable"` + Network string `json:"network"` + Server string `json:"server"` + Facility string `json:"facility"` + Severity string `json:"severity"` + Tag string `json:"tag"` + } `json:"syslog"` } var Config ConfigT - func LoadConfiguration() { - err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config) - if err != nil { - log.Fatalf("Unable to parse configuration: %s", err) - } + err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config) + if err != nil { + log.Fatalf("Unable to parse configuration: %s", err) + } - Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD") + Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD") } - diff --git a/src/ma/mqtt/mqtt.go b/src/ma/mqtt/mqtt.go index cc11c17..fbc9d11 100644 --- a/src/ma/mqtt/mqtt.go +++ b/src/ma/mqtt/mqtt.go @@ -1,18 +1,22 @@ package mqtt -import "log" -import "strings" -import "fmt" -import MQTT "github.com/eclipse/paho.mqtt.golang" -import "github.com/google/uuid" -import "crypto/tls" -import "ma/config" -import "ma/counter" +import ( + "fmt" + "log" + "strings" + + "crypto/tls" + "ma/config" + "ma/counter" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" +) type Message struct { - Topic string - Payload []byte - Retained bool + Topic string + Payload []byte + Retained bool } var InputChannel chan Message = make(chan Message, 100) @@ -21,119 +25,115 @@ var OutputChannel chan Message = make(chan Message, 100) var mqttClient MQTT.Client func onMessageReceived(client MQTT.Client, message MQTT.Message) { - //log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) - m := Message { - Topic: message.Topic(), - Payload: message.Payload(), - Retained: message.Retained(), - } - if m.Retained { - counter.S("Skipped") - //log.Println("Retained message skipped") - return - } - for _, i := range config.Config.ExcludeTopics { - if TopicMatchesSubscription(m.Topic, i) { - counter.S("Skipped") - //log.Println("Message skipped") - return - } - } - select { - case InputChannel <- m: - counter.S("Received") - //log.Println("Message sent to channel") - default: - //log.Println("Channel full, message lost") - counter.F("Received") - } + //log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) + m := Message{ + Topic: message.Topic(), + Payload: message.Payload(), + Retained: message.Retained(), + } + if m.Retained { + counter.S("Skipped") + //log.Println("Retained message skipped") + return + } + for _, i := range config.Config.ExcludeTopics { + if TopicMatchesSubscription(m.Topic, i) { + counter.S("Skipped") + //log.Println("Message skipped") + return + } + } + select { + case InputChannel <- m: + counter.S("Received") + //log.Println("Message sent to channel") + default: + //log.Println("Channel full, message lost") + counter.F("Received") + } } func onConnectionLost(client MQTT.Client, error error) { - log.Printf("Connection lost, error %s", error) + log.Printf("Connection lost, error %s", error) } func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { - log.Println("Oops, connection lost, already reconnecting ...") + log.Println("Oops, connection lost, already reconnecting ...") } - func onConnect(client MQTT.Client) { - for _, topic := range config.Config.IncludeTopics { - if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) - } - log.Printf("Topic %s subscribed", topic) - } + for _, topic := range config.Config.IncludeTopics { + if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) + } + log.Printf("Topic %s subscribed", topic) + } } func outputDispatcher(client MQTT.Client) { - for { - select { - case message := <- OutputChannel: - log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) - if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { - log.Printf("Unable to publish, error %s", token.Error()) - } - log.Println("Successfully published") - } - } + for message := range OutputChannel { + log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) + if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { + log.Printf("Unable to publish, error %s", token.Error()) + } + log.Println("Successfully published") + } } func StartMqttClient() { - broker := config.Config.Mqtt.Broker - if broker == "" { - log.Fatal("No broker given, set env var MQTT_BROKER") - } - - prefix := "MA" - uuid := uuid.New() - clientId := fmt.Sprintf("%s-%s", prefix, uuid) - - opts := MQTT.NewClientOptions(). - AddBroker(broker). - SetClientID(clientId). - SetConnectionLostHandler(onConnectionLost). - SetOnConnectHandler(onConnect). - SetReconnectingHandler(onReconnecting). - SetConnectRetry(true) - - username := config.Config.Mqtt.Username - if username != "" { - opts.SetUsername(username) - } - - password := config.Config.Mqtt.Password - if password != "" { - opts.SetPassword(password) - } - - enableTls := config.Config.Mqtt.TlsEnable - if enableTls == "true" { - //log.Println("Enabling TLS connection") - tlsConfig := &tls.Config { - InsecureSkipVerify: true, - } - opts.SetTLSConfig(tlsConfig) + broker := config.Config.Mqtt.Broker + if broker == "" { + log.Fatal("No broker given, set env var MQTT_BROKER") } - log.Println("Broker connecting") - mqttClient = MQTT.NewClient(opts) - if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { - log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) - } - log.Printf("Successfully connected to broker %s", broker) - log.Printf("Include topics: %s", config.Config.IncludeTopics) - log.Printf("Exclude topics: %s", config.Config.ExcludeTopics) + prefix := "MA" + uuid := uuid.New() + clientId := fmt.Sprintf("%s-%s", prefix, uuid) - go outputDispatcher(mqttClient) + opts := MQTT.NewClientOptions(). + AddBroker(broker). + SetClientID(clientId). + SetConnectionLostHandler(onConnectionLost). + SetOnConnectHandler(onConnect). + SetReconnectingHandler(onReconnecting). + SetConnectRetry(true) - return + username := config.Config.Mqtt.Username + if username != "" { + opts.SetUsername(username) + } + + password := config.Config.Mqtt.Password + if password != "" { + opts.SetPassword(password) + } + + enableTls := config.Config.Mqtt.TlsEnable + if enableTls == "true" { + //log.Println("Enabling TLS connection") + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } + opts.SetTLSConfig(tlsConfig) + } + + log.Println("Broker connecting") + mqttClient = MQTT.NewClient(opts) + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error()) + } + log.Printf("Successfully connected to broker %s", broker) + log.Printf("Include topics: %s", config.Config.IncludeTopics) + log.Printf("Exclude topics: %s", config.Config.ExcludeTopics) + + go outputDispatcher(mqttClient) + + return } func StopMqttClient() { - log.Println("Disconnecting from broker") - mqttClient.Disconnect(250) + log.Println("Disconnecting from broker") + mqttClient.Disconnect(250) } func TopicMatchesSubscription(topic, subscription string) bool { @@ -152,4 +152,3 @@ func TopicMatchesSubscription(topic, subscription string) bool { return true } -