syslog writing works

This commit is contained in:
2026-01-12 17:43:06 +01:00
parent 306535c933
commit cdf6a6c44a
7 changed files with 269 additions and 127 deletions

View File

@@ -9,6 +9,7 @@ RUN go build -a -installsuffix nocgo -o ma main.go
FROM scratch
ENV MA_CONF ""
ENV MQTT_PASSWORD ""
COPY --from=builder /go/src/ma ./
ENTRYPOINT ["./ma"]

View File

@@ -12,5 +12,13 @@
"snmp",
"MainsCnt/#",
"cem/#"
]
],
"syslog": {
"enable": "true",
"network": "udp",
"server": "172.20.0.10:514",
"facility": "local0",
"severity": "info",
"tag": "mqtt-archiver"
}
}

View File

@@ -12,5 +12,13 @@
"snmp",
"MainsCnt/#",
"cem/#"
]
],
"syslog": {
"enable": "false",
"network": "udp",
"server": "syslog-server:514",
"facility": "local0",
"severity": "info",
"tag": "mqtt-archiver"
}
}

View File

@@ -7,13 +7,14 @@ import (
)
type Message struct {
Time time.Time
Topic string
Payload string
Time time.Time `json:"time"`
Topic string `json:"topic"`
Payload string `json:"payload"`
}
func InitArchiver() {
log.Printf("Archiver initializing")
InitSyslog()
}
func InputArchiver() {
@@ -25,4 +26,6 @@ func InputArchiver() {
func handleMessage(message Message) {
log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
WriteSyslog(message)
}

115
src/ma/archiver/syslog.go Normal file
View File

@@ -0,0 +1,115 @@
package archiver
import (
"encoding/json"
"log"
"log/syslog"
"ma/config"
)
var syslogWriter *syslog.Writer
func InitSyslog() {
if config.Config.Syslog.Enable == "true" {
// Parse facility
facility := parseFacility(config.Config.Syslog.Facility)
// Parse severity
severity := parseSeverity(config.Config.Syslog.Severity)
// Combine to priority
priority := facility | severity
var err error
syslogWriter, err = syslog.Dial(
config.Config.Syslog.Network,
config.Config.Syslog.Server,
priority,
config.Config.Syslog.Tag,
)
if err != nil {
log.Fatalf("Failed to connect to syslog server: %v", err)
}
log.Printf("Syslog connection established: %s://%s", config.Config.Syslog.Network, config.Config.Syslog.Server)
}
}
func WriteSyslog(message Message) {
if syslogWriter != nil {
jsonData, err := json.Marshal(message)
if err != nil {
log.Printf("Failed to marshal message to JSON: %v", err)
return
}
// Send to syslog based on configured severity
switch config.Config.Syslog.Severity {
case "emerg":
syslogWriter.Emerg(string(jsonData))
case "alert":
syslogWriter.Alert(string(jsonData))
case "crit":
syslogWriter.Crit(string(jsonData))
case "err":
syslogWriter.Err(string(jsonData))
case "warning":
syslogWriter.Warning(string(jsonData))
case "notice":
syslogWriter.Notice(string(jsonData))
case "info":
syslogWriter.Info(string(jsonData))
case "debug":
syslogWriter.Debug(string(jsonData))
default:
syslogWriter.Info(string(jsonData))
}
}
}
func parseFacility(facility string) syslog.Priority {
facilities := map[string]syslog.Priority{
"kern": syslog.LOG_KERN,
"user": syslog.LOG_USER,
"mail": syslog.LOG_MAIL,
"daemon": syslog.LOG_DAEMON,
"auth": syslog.LOG_AUTH,
"syslog": syslog.LOG_SYSLOG,
"lpr": syslog.LOG_LPR,
"news": syslog.LOG_NEWS,
"uucp": syslog.LOG_UUCP,
"cron": syslog.LOG_CRON,
"authpriv": syslog.LOG_AUTHPRIV,
"ftp": syslog.LOG_FTP,
"local0": syslog.LOG_LOCAL0,
"local1": syslog.LOG_LOCAL1,
"local2": syslog.LOG_LOCAL2,
"local3": syslog.LOG_LOCAL3,
"local4": syslog.LOG_LOCAL4,
"local5": syslog.LOG_LOCAL5,
"local6": syslog.LOG_LOCAL6,
"local7": syslog.LOG_LOCAL7,
}
if f, ok := facilities[facility]; ok {
return f
}
return syslog.LOG_LOCAL0 // Default
}
func parseSeverity(severity string) syslog.Priority {
severities := map[string]syslog.Priority{
"emerg": syslog.LOG_EMERG,
"alert": syslog.LOG_ALERT,
"crit": syslog.LOG_CRIT,
"err": syslog.LOG_ERR,
"warning": syslog.LOG_WARNING,
"notice": syslog.LOG_NOTICE,
"info": syslog.LOG_INFO,
"debug": syslog.LOG_DEBUG,
}
if s, ok := severities[severity]; ok {
return s
}
return syslog.LOG_INFO // Default
}

View File

@@ -1,33 +1,41 @@
package config
import "encoding/json"
import "log"
import "os"
import (
"encoding/json"
"log"
"os"
)
type HandlerConfigT struct {
Attributes map[string]string `json:"attributes"`
Attributes map[string]string `json:"attributes"`
}
type ConfigT struct {
Mqtt struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string
TlsEnable string `json:"tlsEnable"`
} `json:"mqtt"`
IncludeTopics []string `json:"includeTopics"`
ExcludeTopics []string `json:"excludeTopics"`
Mqtt struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string
TlsEnable string `json:"tlsEnable"`
} `json:"mqtt"`
IncludeTopics []string `json:"includeTopics"`
ExcludeTopics []string `json:"excludeTopics"`
Syslog struct {
Enable string `json:"enable"`
Network string `json:"network"`
Server string `json:"server"`
Facility string `json:"facility"`
Severity string `json:"severity"`
Tag string `json:"tag"`
} `json:"syslog"`
}
var Config ConfigT
func LoadConfiguration() {
err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config)
if err != nil {
log.Fatalf("Unable to parse configuration: %s", err)
}
err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config)
if err != nil {
log.Fatalf("Unable to parse configuration: %s", err)
}
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD")
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD")
}

View File

@@ -1,18 +1,22 @@
package mqtt
import "log"
import "strings"
import "fmt"
import MQTT "github.com/eclipse/paho.mqtt.golang"
import "github.com/google/uuid"
import "crypto/tls"
import "ma/config"
import "ma/counter"
import (
"fmt"
"log"
"strings"
"crypto/tls"
"ma/config"
"ma/counter"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
type Message struct {
Topic string
Payload []byte
Retained bool
Topic string
Payload []byte
Retained bool
}
var InputChannel chan Message = make(chan Message, 100)
@@ -21,119 +25,115 @@ var OutputChannel chan Message = make(chan Message, 100)
var mqttClient MQTT.Client
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
//log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
m := Message {
Topic: message.Topic(),
Payload: message.Payload(),
Retained: message.Retained(),
}
if m.Retained {
counter.S("Skipped")
//log.Println("Retained message skipped")
return
}
for _, i := range config.Config.ExcludeTopics {
if TopicMatchesSubscription(m.Topic, i) {
counter.S("Skipped")
//log.Println("Message skipped")
return
}
}
select {
case InputChannel <- m:
counter.S("Received")
//log.Println("Message sent to channel")
default:
//log.Println("Channel full, message lost")
counter.F("Received")
}
//log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
m := Message{
Topic: message.Topic(),
Payload: message.Payload(),
Retained: message.Retained(),
}
if m.Retained {
counter.S("Skipped")
//log.Println("Retained message skipped")
return
}
for _, i := range config.Config.ExcludeTopics {
if TopicMatchesSubscription(m.Topic, i) {
counter.S("Skipped")
//log.Println("Message skipped")
return
}
}
select {
case InputChannel <- m:
counter.S("Received")
//log.Println("Message sent to channel")
default:
//log.Println("Channel full, message lost")
counter.F("Received")
}
}
func onConnectionLost(client MQTT.Client, error error) {
log.Printf("Connection lost, error %s", error)
log.Printf("Connection lost, error %s", error)
}
func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) {
log.Println("Oops, connection lost, already reconnecting ...")
log.Println("Oops, connection lost, already reconnecting ...")
}
func onConnect(client MQTT.Client) {
for _, topic := range config.Config.IncludeTopics {
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error())
}
log.Printf("Topic %s subscribed", topic)
}
for _, topic := range config.Config.IncludeTopics {
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error())
}
log.Printf("Topic %s subscribed", topic)
}
}
func outputDispatcher(client MQTT.Client) {
for {
select {
case message := <- OutputChannel:
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
log.Printf("Unable to publish, error %s", token.Error())
}
log.Println("Successfully published")
}
}
for message := range OutputChannel {
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
log.Printf("Unable to publish, error %s", token.Error())
}
log.Println("Successfully published")
}
}
func StartMqttClient() {
broker := config.Config.Mqtt.Broker
if broker == "" {
log.Fatal("No broker given, set env var MQTT_BROKER")
}
prefix := "MA"
uuid := uuid.New()
clientId := fmt.Sprintf("%s-%s", prefix, uuid)
opts := MQTT.NewClientOptions().
AddBroker(broker).
SetClientID(clientId).
SetConnectionLostHandler(onConnectionLost).
SetOnConnectHandler(onConnect).
SetReconnectingHandler(onReconnecting).
SetConnectRetry(true)
username := config.Config.Mqtt.Username
if username != "" {
opts.SetUsername(username)
}
password := config.Config.Mqtt.Password
if password != "" {
opts.SetPassword(password)
}
enableTls := config.Config.Mqtt.TlsEnable
if enableTls == "true" {
//log.Println("Enabling TLS connection")
tlsConfig := &tls.Config {
InsecureSkipVerify: true,
}
opts.SetTLSConfig(tlsConfig)
broker := config.Config.Mqtt.Broker
if broker == "" {
log.Fatal("No broker given, set env var MQTT_BROKER")
}
log.Println("Broker connecting")
mqttClient = MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
}
log.Printf("Successfully connected to broker %s", broker)
log.Printf("Include topics: %s", config.Config.IncludeTopics)
log.Printf("Exclude topics: %s", config.Config.ExcludeTopics)
prefix := "MA"
uuid := uuid.New()
clientId := fmt.Sprintf("%s-%s", prefix, uuid)
go outputDispatcher(mqttClient)
opts := MQTT.NewClientOptions().
AddBroker(broker).
SetClientID(clientId).
SetConnectionLostHandler(onConnectionLost).
SetOnConnectHandler(onConnect).
SetReconnectingHandler(onReconnecting).
SetConnectRetry(true)
return
username := config.Config.Mqtt.Username
if username != "" {
opts.SetUsername(username)
}
password := config.Config.Mqtt.Password
if password != "" {
opts.SetPassword(password)
}
enableTls := config.Config.Mqtt.TlsEnable
if enableTls == "true" {
//log.Println("Enabling TLS connection")
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
opts.SetTLSConfig(tlsConfig)
}
log.Println("Broker connecting")
mqttClient = MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
}
log.Printf("Successfully connected to broker %s", broker)
log.Printf("Include topics: %s", config.Config.IncludeTopics)
log.Printf("Exclude topics: %s", config.Config.ExcludeTopics)
go outputDispatcher(mqttClient)
return
}
func StopMqttClient() {
log.Println("Disconnecting from broker")
mqttClient.Disconnect(250)
log.Println("Disconnecting from broker")
mqttClient.Disconnect(250)
}
func TopicMatchesSubscription(topic, subscription string) bool {
@@ -152,4 +152,3 @@ func TopicMatchesSubscription(topic, subscription string) bool {
return true
}