11 Commits
0.0.9 ... 0.1.2

Author SHA1 Message Date
3d09af50ec adjust logging format
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-01-13 11:35:40 +01:00
070decc151 drop database relicts
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-01-13 11:31:23 +01:00
06b217a990 fix config
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-01-12 22:47:22 +01:00
d5c0bd3b01 fix ci script 2
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-01-12 22:35:15 +01:00
29a40638bd fix ci script
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2026-01-12 22:34:21 +01:00
a93c0e124e changes 2026-01-12 22:32:18 +01:00
cdf6a6c44a syslog writing works 2026-01-12 17:43:06 +01:00
306535c933 syslog started 2026-01-12 16:03:57 +01:00
5a25204f2f new database
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-01-09 14:31:39 +01:00
52b414f60f use matest in local env
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-12-02 13:11:54 +01:00
3fc83eefe6 logging
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2024-12-02 11:48:24 +01:00
18 changed files with 290 additions and 367 deletions

View File

@@ -5,20 +5,20 @@ steps:
repo: ${FORGE_NAME}/${CI_REPO} repo: ${FORGE_NAME}/${CI_REPO}
registry: registry:
from_secret: container_registry from_secret: container_registry
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG} tags: latest,${CI_COMMIT_TAG}
username: username:
from_secret: container_registry_username from_secret: container_registry_username
password: password:
from_secret: container_registry_password from_secret: container_registry_password
dockerfile: Dockerfile dockerfile: Dockerfile
when: when:
- event: [push, tag] - event: tag
deploy: deploy:
image: portainer/kubectl-shell:latest image: quay.io/wollud1969/k8s-admin-helper:0.3.4
secrets: environment:
- source: kube_config KUBE_CONFIG_CONTENT:
target: KUBE_CONFIG_CONTENT from_secret: kube_config
commands: commands:
- export IMAGE_TAG=$CI_COMMIT_TAG - export IMAGE_TAG=$CI_COMMIT_TAG
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig - printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig

View File

@@ -9,6 +9,7 @@ RUN go build -a -installsuffix nocgo -o ma main.go
FROM scratch FROM scratch
ENV MA_CONF "" ENV MA_CONF ""
ENV MQTT_PASSWORD ""
COPY --from=builder /go/src/ma ./ COPY --from=builder /go/src/ma ./
ENTRYPOINT ["./ma"] ENTRYPOINT ["./ma"]

View File

@@ -1,11 +0,0 @@
N=homea
PGHOST=`kubectl get services traefik -n system -o jsonpath="{.status.loadBalancer.ingress[0].ip}"`
PGPASSWORD=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGPASSWORD}" | base64 --decode`
PGUSER=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGUSER}" | base64 --decode`
PGSSLMODE=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGSSLMODE}" | base64 --decode`
PGDATABASE="ma"
export PGUSER PGHOST PGPASSWORD PGSSLMODE PGDATABASE
MA_CONF=`cat config-test.json`
export MA_CONF

View File

@@ -1,6 +1,6 @@
{ {
"mqtt": { "mqtt": {
"broker": "mqtt://172.23.1.102:1883", "broker": "mqtt://172.23.1.101:8883",
"username": "archiver", "username": "archiver",
"tlsEnable": "true" "tlsEnable": "true"
}, },
@@ -12,5 +12,13 @@
"snmp", "snmp",
"MainsCnt/#", "MainsCnt/#",
"cem/#" "cem/#"
] ],
"syslog": {
"enable": "true",
"network": "udp",
"server": "172.20.0.10:514",
"facility": "local0",
"severity": "info",
"tag": "mqtt-archiver"
}
} }

View File

@@ -8,9 +8,14 @@
"#" "#"
], ],
"excludeTopics": [ "excludeTopics": [
"IoT/Watchdog", "IoT/Watchdog"
"snmp", ],
"MainsCnt/#", "syslog": {
"cem/#" "enable": "true",
] "network": "udp",
"server": "172.20.0.10:514",
"facility": "local0",
"severity": "info",
"tag": "mqtt-archiver"
}
} }

View File

@@ -4,8 +4,6 @@ metadata:
name: ma name: ma
labels: labels:
app: ma app: ma
annotations:
secret.reloader.stakater.com/reload: ma-db-cred
spec: spec:
replicas: 1 replicas: 1
selector: selector:
@@ -21,8 +19,6 @@ spec:
image: %IMAGE% image: %IMAGE%
imagePullPolicy: Always imagePullPolicy: Always
envFrom: envFrom:
- secretRef:
name: ma-db-cred
- secretRef: - secretRef:
name: ma-mqtt-cred name: ma-mqtt-cred
- configMapRef: - configMapRef:

View File

@@ -1,39 +0,0 @@
#!/bin/bash
export PGUSER=`kubectl get secret -n database timescaledb -o jsonpath="{.data.superuser-username}" | base64 --decode`
export PGHOST=`kubectl get services traefik -n system -o jsonpath="{.status.loadBalancer.ingress[0].ip}"`
export PGPASSWORD=`kubectl get secret -n database timescaledb -o jsonpath="{.data.superuser-password}" | base64 --decode`
export PGSSLMODE=require
DATABASE=ma
LOGIN=ma
PASSWORD=`openssl rand -base64 24`
NAMESPACE=`cat namespace`
psql <<EOF
do
\$\$
begin
ALTER USER $LOGIN WITH PASSWORD '$PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE $DATABASE TO $LOGIN;
end
\$\$
;
commit;
EOF
kubectl create secret generic ma-db-cred \
--dry-run=client \
-o yaml \
--save-config \
--from-literal=PGUSER="$LOGIN" \
--from-literal=PGPASSWORD="$PASSWORD" \
--from-literal=PGDATABASE="$DATABASE" \
--from-literal=PGHOST="timescaledb.database.svc.cluster.local" \
--from-literal=PGSSLMODE="require" | \
kubectl apply -f - -n $NAMESPACE

View File

@@ -1,35 +1,32 @@
package archiver package archiver
import "log" import (
import "time" "fmt"
//import "os" "log"
//import "fmt" "ma/mqtt"
//import "net/url" "time"
import "ma/mqtt" )
//import "ma/config"
//import "ma/counter"
import "ma/database"
var dbh *database.DatabaseHandle
type Message struct {
Time time.Time `json:"time"`
Topic string `json:"topic"`
Payload string `json:"payload"`
}
func InitArchiver() { func InitArchiver() {
log.Printf("Archiver initializing") log.Printf("Archiver initializing")
dbh = database.NewDatabaseHandle() InitSyslog()
} }
func InputArchiver() { func InputArchiver() {
for { for mqttMessage := range mqtt.InputChannel {
select { message := Message{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
case mqttMessage := <- mqtt.InputChannel: handleMessage(message)
message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } }
handleMessage(message)
}
}
} }
func handleMessage(message database.Message) { func handleMessage(message Message) {
// log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload) log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
dbh.StoreMessage(&message)
}
WriteSyslog(fmt.Sprintf("| TS: %s | TOPIC: %s | PAYLOAD: %s |", message.Time.String(), message.Topic, message.Payload))
}

110
src/ma/archiver/syslog.go Normal file
View File

@@ -0,0 +1,110 @@
package archiver
import (
"log"
"log/syslog"
"ma/config"
)
var syslogWriter *syslog.Writer
func InitSyslog() {
if config.Config.Syslog.Enable == "true" {
// Parse facility
facility := parseFacility(config.Config.Syslog.Facility)
// Parse severity
severity := parseSeverity(config.Config.Syslog.Severity)
// Combine to priority
priority := facility | severity
var err error
// Connect to remote syslog server
syslogWriter, err = syslog.Dial(
config.Config.Syslog.Network,
config.Config.Syslog.Server,
priority,
config.Config.Syslog.Tag,
)
if err != nil {
log.Fatalf("Failed to connect to syslog server: %v", err)
}
log.Printf("Syslog connection established: %s://%s", config.Config.Syslog.Network, config.Config.Syslog.Server)
}
}
func WriteSyslog(message string) {
if syslogWriter != nil {
// Send to syslog based on configured severity
switch config.Config.Syslog.Severity {
case "emerg":
syslogWriter.Emerg(message)
case "alert":
syslogWriter.Alert(message)
case "crit":
syslogWriter.Crit(message)
case "err":
syslogWriter.Err(message)
case "warning":
syslogWriter.Warning(message)
case "notice":
syslogWriter.Notice(message)
case "info":
syslogWriter.Info(message)
case "debug":
syslogWriter.Debug(message)
default:
syslogWriter.Info(message)
}
}
}
func parseFacility(facility string) syslog.Priority {
facilities := map[string]syslog.Priority{
"kern": syslog.LOG_KERN,
"user": syslog.LOG_USER,
"mail": syslog.LOG_MAIL,
"daemon": syslog.LOG_DAEMON,
"auth": syslog.LOG_AUTH,
"syslog": syslog.LOG_SYSLOG,
"lpr": syslog.LOG_LPR,
"news": syslog.LOG_NEWS,
"uucp": syslog.LOG_UUCP,
"cron": syslog.LOG_CRON,
"authpriv": syslog.LOG_AUTHPRIV,
"ftp": syslog.LOG_FTP,
"local0": syslog.LOG_LOCAL0,
"local1": syslog.LOG_LOCAL1,
"local2": syslog.LOG_LOCAL2,
"local3": syslog.LOG_LOCAL3,
"local4": syslog.LOG_LOCAL4,
"local5": syslog.LOG_LOCAL5,
"local6": syslog.LOG_LOCAL6,
"local7": syslog.LOG_LOCAL7,
}
if f, ok := facilities[facility]; ok {
return f
}
return syslog.LOG_LOCAL0 // Default
}
func parseSeverity(severity string) syslog.Priority {
severities := map[string]syslog.Priority{
"emerg": syslog.LOG_EMERG,
"alert": syslog.LOG_ALERT,
"crit": syslog.LOG_CRIT,
"err": syslog.LOG_ERR,
"warning": syslog.LOG_WARNING,
"notice": syslog.LOG_NOTICE,
"info": syslog.LOG_INFO,
"debug": syslog.LOG_DEBUG,
}
if s, ok := severities[severity]; ok {
return s
}
return syslog.LOG_INFO // Default
}

View File

@@ -1,33 +1,44 @@
package config package config
import "encoding/json" import (
import "log" "encoding/json"
import "os" "log"
"os"
)
type HandlerConfigT struct { type HandlerConfigT struct {
Attributes map[string]string `json:"attributes"` Attributes map[string]string `json:"attributes"`
} }
type ConfigT struct { type ConfigT struct {
Mqtt struct { Mqtt struct {
Broker string `json:"broker"` Broker string `json:"broker"`
Username string `json:"username"` Username string `json:"username"`
Password string Password string `json:"password"`
TlsEnable string `json:"tlsEnable"` TlsEnable string `json:"tlsEnable"`
} `json:"mqtt"` } `json:"mqtt"`
IncludeTopics []string `json:"includeTopics"` IncludeTopics []string `json:"includeTopics"`
ExcludeTopics []string `json:"excludeTopics"` ExcludeTopics []string `json:"excludeTopics"`
Syslog struct {
Enable string `json:"enable"`
Network string `json:"network"`
Server string `json:"server"`
Facility string `json:"facility"`
Severity string `json:"severity"`
Tag string `json:"tag"`
} `json:"syslog"`
} }
var Config ConfigT var Config ConfigT
func LoadConfiguration() { func LoadConfiguration() {
err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config) err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config)
if err != nil { if err != nil {
log.Fatalf("Unable to parse configuration: %s", err) log.Fatalf("Unable to parse configuration: %s", err)
} }
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD") // Load password from environment variable only if not set in config
if Config.Mqtt.Password == "" {
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD")
}
} }

View File

@@ -1,12 +0,0 @@
package database
import "time"
// import "gorm.io/gorm"
type Message struct {
Time time.Time `gorm:"not null;primary_key"`
Topic string `gorm:"not null"`
Payload string `gorm:"not null"`
}

View File

@@ -1,53 +0,0 @@
package database
import (
"log"
//"time"
//"fmt"
"ma/counter"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type DatabaseHandle struct {
initialized bool
dbh *gorm.DB
}
func NewDatabaseHandle() *DatabaseHandle {
var db DatabaseHandle
// inject the whole database configuration via the well-known PG* env variables
conn, err := gorm.Open(postgres.Open(""))
if err != nil {
log.Printf("Unable to open database connection: %s", err)
db.initialized = false
} else {
db.dbh = conn
db.initialized = true
//log.Println("Database connection opened")
}
return &db
}
func (self *DatabaseHandle) StoreMessage(message *Message) {
if ! self.initialized {
log.Printf("Database connection not initialized, can not store, message %s lost", message)
counter.F("Stored")
return
}
result := self.dbh.Create(message)
if result.Error != nil {
log.Printf("Unable to insert, message %s lost, error: %s", message, result.Error)
counter.F("Stored")
return
}
//log.Println("Successfully stored message")
counter.S("Stored")
}

View File

@@ -1,22 +0,0 @@
package database
import (
"log"
//"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func Migrate() {
dsn := ""
db, err := gorm.Open(postgres.Open(dsn))
if err != nil {
log.Fatalf("Unable to open database connection: %s", err)
}
db.AutoMigrate(&Message{})
log.Println("Message created")
log.Println("Remember to call create_hypertable on message, sowhat I can't do that for you.")
}

View File

@@ -1,7 +0,0 @@
create extension if not exists timescaledb;
select create_hypertable('message', 'time');

View File

@@ -5,20 +5,10 @@ go 1.21.3
require ( require (
github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
gorm.io/driver/postgres v1.5.10
gorm.io/gorm v1.25.12
) )
require ( require (
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
) )

View File

@@ -1,44 +1,10 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE=
gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=

View File

@@ -1,18 +0,0 @@
package main
import "log"
import "ma/database"
func main() {
log.SetPrefix("UDI Migrate Schema: ")
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
log.Println("Starting")
database.Migrate()
log.Println("Done")
}

View File

@@ -1,18 +1,22 @@
package mqtt package mqtt
import "log" import (
import "strings" "fmt"
import "fmt" "log"
import MQTT "github.com/eclipse/paho.mqtt.golang" "strings"
import "github.com/google/uuid"
import "crypto/tls" "crypto/tls"
import "ma/config" "ma/config"
import "ma/counter" "ma/counter"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
type Message struct { type Message struct {
Topic string Topic string
Payload []byte Payload []byte
Retained bool Retained bool
} }
var InputChannel chan Message = make(chan Message, 100) var InputChannel chan Message = make(chan Message, 100)
@@ -21,117 +25,115 @@ var OutputChannel chan Message = make(chan Message, 100)
var mqttClient MQTT.Client var mqttClient MQTT.Client
func onMessageReceived(client MQTT.Client, message MQTT.Message) { func onMessageReceived(client MQTT.Client, message MQTT.Message) {
//log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) //log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
m := Message { m := Message{
Topic: message.Topic(), Topic: message.Topic(),
Payload: message.Payload(), Payload: message.Payload(),
Retained: message.Retained(), Retained: message.Retained(),
} }
if m.Retained { if m.Retained {
counter.S("Skipped") counter.S("Skipped")
//log.Println("Retained message skipped") //log.Println("Retained message skipped")
return return
} }
for _, i := range config.Config.ExcludeTopics { for _, i := range config.Config.ExcludeTopics {
if TopicMatchesSubscription(m.Topic, i) { if TopicMatchesSubscription(m.Topic, i) {
counter.S("Skipped") counter.S("Skipped")
//log.Println("Message skipped") //log.Println("Message skipped")
return return
} }
} }
select { select {
case InputChannel <- m: case InputChannel <- m:
counter.S("Received") counter.S("Received")
//log.Println("Message sent to channel") //log.Println("Message sent to channel")
default: default:
//log.Println("Channel full, message lost") //log.Println("Channel full, message lost")
counter.F("Received") counter.F("Received")
} }
} }
func onConnectionLost(client MQTT.Client, error error) { func onConnectionLost(client MQTT.Client, error error) {
log.Printf("Connection lost, error %s", error) log.Printf("Connection lost, error %s", error)
} }
func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) { func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) {
log.Println("Oops, connection lost, already reconnecting ...") log.Println("Oops, connection lost, already reconnecting ...")
} }
func onConnect(client MQTT.Client) { func onConnect(client MQTT.Client) {
for _, topic := range config.Config.IncludeTopics { for _, topic := range config.Config.IncludeTopics {
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil { if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error()) log.Fatalf("Unable to subscribe to topic %s, error %s", topic, token.Error())
} }
log.Printf("Topic %s subscribed", topic) log.Printf("Topic %s subscribed", topic)
} }
} }
func outputDispatcher(client MQTT.Client) { func outputDispatcher(client MQTT.Client) {
for { for message := range OutputChannel {
select { log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
case message := <- OutputChannel: if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload) log.Printf("Unable to publish, error %s", token.Error())
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil { }
log.Printf("Unable to publish, error %s", token.Error()) log.Println("Successfully published")
} }
log.Println("Successfully published")
}
}
} }
func StartMqttClient() { func StartMqttClient() {
broker := config.Config.Mqtt.Broker broker := config.Config.Mqtt.Broker
if broker == "" { if broker == "" {
log.Fatal("No broker given, set env var MQTT_BROKER") log.Fatal("No broker given, set env var MQTT_BROKER")
}
prefix := "MA"
uuid := uuid.New()
clientId := fmt.Sprintf("%s-%s", prefix, uuid)
opts := MQTT.NewClientOptions().
AddBroker(broker).
SetClientID(clientId).
SetConnectionLostHandler(onConnectionLost).
SetOnConnectHandler(onConnect).
SetReconnectingHandler(onReconnecting).
SetConnectRetry(true)
username := config.Config.Mqtt.Username
if username != "" {
opts.SetUsername(username)
}
password := config.Config.Mqtt.Password
if password != "" {
opts.SetPassword(password)
}
enableTls := config.Config.Mqtt.TlsEnable
if enableTls == "true" {
//log.Println("Enabling TLS connection")
tlsConfig := &tls.Config {
InsecureSkipVerify: true,
}
opts.SetTLSConfig(tlsConfig)
} }
log.Println("Broker connecting") prefix := "MA"
mqttClient = MQTT.NewClient(opts) uuid := uuid.New()
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { clientId := fmt.Sprintf("%s-%s", prefix, uuid)
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
}
//log.Printf("Successfully connected to broker %s", broker)
go outputDispatcher(mqttClient) opts := MQTT.NewClientOptions().
AddBroker(broker).
SetClientID(clientId).
SetConnectionLostHandler(onConnectionLost).
SetOnConnectHandler(onConnect).
SetReconnectingHandler(onReconnecting).
SetConnectRetry(true)
return username := config.Config.Mqtt.Username
if username != "" {
opts.SetUsername(username)
}
password := config.Config.Mqtt.Password
if password != "" {
opts.SetPassword(password)
}
enableTls := config.Config.Mqtt.TlsEnable
if enableTls == "true" {
//log.Println("Enabling TLS connection")
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
opts.SetTLSConfig(tlsConfig)
}
log.Println("Broker connecting")
mqttClient = MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
}
log.Printf("Successfully connected to broker %s", broker)
log.Printf("Include topics: %s", config.Config.IncludeTopics)
log.Printf("Exclude topics: %s", config.Config.ExcludeTopics)
go outputDispatcher(mqttClient)
return
} }
func StopMqttClient() { func StopMqttClient() {
log.Println("Disconnecting from broker") log.Println("Disconnecting from broker")
mqttClient.Disconnect(250) mqttClient.Disconnect(250)
} }
func TopicMatchesSubscription(topic, subscription string) bool { func TopicMatchesSubscription(topic, subscription string) bool {
@@ -150,4 +152,3 @@ func TopicMatchesSubscription(topic, subscription string) bool {
return true return true
} }