Compare commits
11 Commits
0.0.7
...
archiving_
| Author | SHA1 | Date | |
|---|---|---|---|
|
06b217a990
|
|||
|
d5c0bd3b01
|
|||
|
29a40638bd
|
|||
|
a93c0e124e
|
|||
|
cdf6a6c44a
|
|||
|
306535c933
|
|||
|
5a25204f2f
|
|||
|
52b414f60f
|
|||
|
3fc83eefe6
|
|||
|
a58959a316
|
|||
|
2c21ab00a0
|
@@ -5,20 +5,20 @@ steps:
|
|||||||
repo: ${FORGE_NAME}/${CI_REPO}
|
repo: ${FORGE_NAME}/${CI_REPO}
|
||||||
registry:
|
registry:
|
||||||
from_secret: container_registry
|
from_secret: container_registry
|
||||||
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
|
tags: latest,${CI_COMMIT_TAG}
|
||||||
username:
|
username:
|
||||||
from_secret: container_registry_username
|
from_secret: container_registry_username
|
||||||
password:
|
password:
|
||||||
from_secret: container_registry_password
|
from_secret: container_registry_password
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
when:
|
when:
|
||||||
- event: [push, tag]
|
- event: tag
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
image: portainer/kubectl-shell:latest
|
image: quay.io/wollud1969/k8s-admin-helper:0.3.4
|
||||||
secrets:
|
environment:
|
||||||
- source: kube_config
|
KUBE_CONFIG_CONTENT:
|
||||||
target: KUBE_CONFIG_CONTENT
|
from_secret: kube_config
|
||||||
commands:
|
commands:
|
||||||
- export IMAGE_TAG=$CI_COMMIT_TAG
|
- export IMAGE_TAG=$CI_COMMIT_TAG
|
||||||
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
|
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ RUN go build -a -installsuffix nocgo -o ma main.go
|
|||||||
FROM scratch
|
FROM scratch
|
||||||
|
|
||||||
ENV MA_CONF ""
|
ENV MA_CONF ""
|
||||||
|
ENV MQTT_PASSWORD ""
|
||||||
|
|
||||||
COPY --from=builder /go/src/ma ./
|
COPY --from=builder /go/src/ma ./
|
||||||
ENTRYPOINT ["./ma"]
|
ENTRYPOINT ["./ma"]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ PGHOST=`kubectl get services traefik -n system -o jsonpath="{.status.loadBalance
|
|||||||
PGPASSWORD=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGPASSWORD}" | base64 --decode`
|
PGPASSWORD=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGPASSWORD}" | base64 --decode`
|
||||||
PGUSER=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGUSER}" | base64 --decode`
|
PGUSER=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGUSER}" | base64 --decode`
|
||||||
PGSSLMODE=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGSSLMODE}" | base64 --decode`
|
PGSSLMODE=`kubectl get secrets ma-db-cred -n $N -o jsonpath="{.data.PGSSLMODE}" | base64 --decode`
|
||||||
PGDATABASE="ma"
|
PGDATABASE="matest"
|
||||||
export PGUSER PGHOST PGPASSWORD PGSSLMODE PGDATABASE
|
export PGUSER PGHOST PGPASSWORD PGSSLMODE PGDATABASE
|
||||||
|
|
||||||
MA_CONF=`cat config-test.json`
|
MA_CONF=`cat config-test.json`
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"mqtt": {
|
"mqtt": {
|
||||||
"broker": "mqtt://172.23.1.102:1883",
|
"broker": "mqtt://172.23.1.101:8883",
|
||||||
"username": "archiver",
|
"username": "archiver",
|
||||||
"tlsEnable": "true"
|
"tlsEnable": "true"
|
||||||
},
|
},
|
||||||
@@ -12,5 +12,13 @@
|
|||||||
"snmp",
|
"snmp",
|
||||||
"MainsCnt/#",
|
"MainsCnt/#",
|
||||||
"cem/#"
|
"cem/#"
|
||||||
]
|
],
|
||||||
|
"syslog": {
|
||||||
|
"enable": "true",
|
||||||
|
"network": "udp",
|
||||||
|
"server": "172.20.0.10:514",
|
||||||
|
"facility": "local0",
|
||||||
|
"severity": "info",
|
||||||
|
"tag": "mqtt-archiver"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,9 +8,14 @@
|
|||||||
"#"
|
"#"
|
||||||
],
|
],
|
||||||
"excludeTopics": [
|
"excludeTopics": [
|
||||||
"IoT/Watchdog",
|
"IoT/Watchdog"
|
||||||
"snmp",
|
],
|
||||||
"MainsCnt/#",
|
"syslog": {
|
||||||
"cem/#"
|
"enable": "true",
|
||||||
]
|
"network": "udp",
|
||||||
|
"server": "172.20.0.10:514",
|
||||||
|
"facility": "local0",
|
||||||
|
"severity": "info",
|
||||||
|
"tag": "mqtt-archiver"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ metadata:
|
|||||||
name: ma
|
name: ma
|
||||||
labels:
|
labels:
|
||||||
app: ma
|
app: ma
|
||||||
annotations:
|
|
||||||
secret.reloader.stakater.com/reload: ma-db-cred
|
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
@@ -21,8 +19,6 @@ spec:
|
|||||||
image: %IMAGE%
|
image: %IMAGE%
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: Always
|
||||||
envFrom:
|
envFrom:
|
||||||
- secretRef:
|
|
||||||
name: ma-db-cred
|
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: ma-mqtt-cred
|
name: ma-mqtt-cred
|
||||||
- configMapRef:
|
- configMapRef:
|
||||||
|
|||||||
@@ -1,10 +1,6 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
export PGUSER=`kubectl get secret -n database timescaledb -o jsonpath="{.data.superuser-username}" | base64 --decode`
|
. ~/Workspace/mykubernetesenv/ENVDB1
|
||||||
export PGHOST=`kubectl get services traefik -n system -o jsonpath="{.status.loadBalancer.ingress[0].ip}"`
|
|
||||||
export PGPASSWORD=`kubectl get secret -n database timescaledb -o jsonpath="{.data.superuser-password}" | base64 --decode`
|
|
||||||
export PGSSLMODE=require
|
|
||||||
|
|
||||||
|
|
||||||
DATABASE=ma
|
DATABASE=ma
|
||||||
LOGIN=ma
|
LOGIN=ma
|
||||||
@@ -33,7 +29,7 @@ kubectl create secret generic ma-db-cred \
|
|||||||
--from-literal=PGUSER="$LOGIN" \
|
--from-literal=PGUSER="$LOGIN" \
|
||||||
--from-literal=PGPASSWORD="$PASSWORD" \
|
--from-literal=PGPASSWORD="$PASSWORD" \
|
||||||
--from-literal=PGDATABASE="$DATABASE" \
|
--from-literal=PGDATABASE="$DATABASE" \
|
||||||
--from-literal=PGHOST="timescaledb.database.svc.cluster.local" \
|
--from-literal=PGHOST="database.database1.svc.cluster.local" \
|
||||||
--from-literal=PGSSLMODE="require" | \
|
--from-literal=PGSSLMODE="require" | \
|
||||||
kubectl apply -f - -n $NAMESPACE
|
kubectl apply -f - -n $NAMESPACE
|
||||||
|
|
||||||
|
|||||||
@@ -1,36 +1,31 @@
|
|||||||
package archiver
|
package archiver
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "time"
|
"log"
|
||||||
//import "os"
|
"ma/mqtt"
|
||||||
//import "fmt"
|
"time"
|
||||||
//import "net/url"
|
)
|
||||||
import "ma/mqtt"
|
|
||||||
//import "ma/config"
|
|
||||||
import "ma/counter"
|
|
||||||
import "ma/database"
|
|
||||||
|
|
||||||
var dbh *database.DatabaseHandle
|
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Payload string `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
func InitArchiver() {
|
func InitArchiver() {
|
||||||
log.Printf("Archiver initializing")
|
log.Printf("Archiver initializing")
|
||||||
dbh = database.NewDatabaseHandle()
|
InitSyslog()
|
||||||
}
|
}
|
||||||
|
|
||||||
func InputArchiver() {
|
func InputArchiver() {
|
||||||
for {
|
for mqttMessage := range mqtt.InputChannel {
|
||||||
select {
|
message := Message{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
|
||||||
case mqttMessage := <- mqtt.InputChannel:
|
|
||||||
message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
|
|
||||||
handleMessage(message)
|
handleMessage(message)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleMessage(message database.Message) {
|
func handleMessage(message Message) {
|
||||||
// log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
|
log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
|
||||||
dbh.StoreMessage(&message)
|
|
||||||
counter.S("Stored")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
WriteSyslog(message)
|
||||||
|
}
|
||||||
|
|||||||
117
src/ma/archiver/syslog.go
Normal file
117
src/ma/archiver/syslog.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"log/syslog"
|
||||||
|
"ma/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
var syslogWriter *syslog.Writer
|
||||||
|
|
||||||
|
func InitSyslog() {
|
||||||
|
if config.Config.Syslog.Enable == "true" {
|
||||||
|
// Parse facility
|
||||||
|
facility := parseFacility(config.Config.Syslog.Facility)
|
||||||
|
|
||||||
|
// Parse severity
|
||||||
|
severity := parseSeverity(config.Config.Syslog.Severity)
|
||||||
|
|
||||||
|
// Combine to priority
|
||||||
|
priority := facility | severity
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Connect to remote syslog server
|
||||||
|
syslogWriter, err = syslog.Dial(
|
||||||
|
config.Config.Syslog.Network,
|
||||||
|
config.Config.Syslog.Server,
|
||||||
|
priority,
|
||||||
|
config.Config.Syslog.Tag,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to syslog server: %v", err)
|
||||||
|
}
|
||||||
|
log.Printf("Syslog connection established: %s://%s", config.Config.Syslog.Network, config.Config.Syslog.Server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WriteSyslog(message Message) {
|
||||||
|
if syslogWriter != nil {
|
||||||
|
jsonData, err := json.Marshal(message)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to marshal message to JSON: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to syslog based on configured severity
|
||||||
|
switch config.Config.Syslog.Severity {
|
||||||
|
case "emerg":
|
||||||
|
syslogWriter.Emerg(string(jsonData))
|
||||||
|
case "alert":
|
||||||
|
syslogWriter.Alert(string(jsonData))
|
||||||
|
case "crit":
|
||||||
|
syslogWriter.Crit(string(jsonData))
|
||||||
|
case "err":
|
||||||
|
syslogWriter.Err(string(jsonData))
|
||||||
|
case "warning":
|
||||||
|
syslogWriter.Warning(string(jsonData))
|
||||||
|
case "notice":
|
||||||
|
syslogWriter.Notice(string(jsonData))
|
||||||
|
case "info":
|
||||||
|
syslogWriter.Info(string(jsonData))
|
||||||
|
case "debug":
|
||||||
|
syslogWriter.Debug(string(jsonData))
|
||||||
|
default:
|
||||||
|
syslogWriter.Info(string(jsonData))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFacility(facility string) syslog.Priority {
|
||||||
|
facilities := map[string]syslog.Priority{
|
||||||
|
"kern": syslog.LOG_KERN,
|
||||||
|
"user": syslog.LOG_USER,
|
||||||
|
"mail": syslog.LOG_MAIL,
|
||||||
|
"daemon": syslog.LOG_DAEMON,
|
||||||
|
"auth": syslog.LOG_AUTH,
|
||||||
|
"syslog": syslog.LOG_SYSLOG,
|
||||||
|
"lpr": syslog.LOG_LPR,
|
||||||
|
"news": syslog.LOG_NEWS,
|
||||||
|
"uucp": syslog.LOG_UUCP,
|
||||||
|
"cron": syslog.LOG_CRON,
|
||||||
|
"authpriv": syslog.LOG_AUTHPRIV,
|
||||||
|
"ftp": syslog.LOG_FTP,
|
||||||
|
"local0": syslog.LOG_LOCAL0,
|
||||||
|
"local1": syslog.LOG_LOCAL1,
|
||||||
|
"local2": syslog.LOG_LOCAL2,
|
||||||
|
"local3": syslog.LOG_LOCAL3,
|
||||||
|
"local4": syslog.LOG_LOCAL4,
|
||||||
|
"local5": syslog.LOG_LOCAL5,
|
||||||
|
"local6": syslog.LOG_LOCAL6,
|
||||||
|
"local7": syslog.LOG_LOCAL7,
|
||||||
|
}
|
||||||
|
|
||||||
|
if f, ok := facilities[facility]; ok {
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
return syslog.LOG_LOCAL0 // Default
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSeverity(severity string) syslog.Priority {
|
||||||
|
severities := map[string]syslog.Priority{
|
||||||
|
"emerg": syslog.LOG_EMERG,
|
||||||
|
"alert": syslog.LOG_ALERT,
|
||||||
|
"crit": syslog.LOG_CRIT,
|
||||||
|
"err": syslog.LOG_ERR,
|
||||||
|
"warning": syslog.LOG_WARNING,
|
||||||
|
"notice": syslog.LOG_NOTICE,
|
||||||
|
"info": syslog.LOG_INFO,
|
||||||
|
"debug": syslog.LOG_DEBUG,
|
||||||
|
}
|
||||||
|
|
||||||
|
if s, ok := severities[severity]; ok {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
return syslog.LOG_INFO // Default
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import "encoding/json"
|
import (
|
||||||
import "log"
|
"encoding/json"
|
||||||
import "os"
|
"log"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
type HandlerConfigT struct {
|
type HandlerConfigT struct {
|
||||||
Attributes map[string]string `json:"attributes"`
|
Attributes map[string]string `json:"attributes"`
|
||||||
@@ -12,22 +14,31 @@ type ConfigT struct {
|
|||||||
Mqtt struct {
|
Mqtt struct {
|
||||||
Broker string `json:"broker"`
|
Broker string `json:"broker"`
|
||||||
Username string `json:"username"`
|
Username string `json:"username"`
|
||||||
Password string
|
Password string `json:"password"`
|
||||||
TlsEnable string `json:"tlsEnable"`
|
TlsEnable string `json:"tlsEnable"`
|
||||||
} `json:"mqtt"`
|
} `json:"mqtt"`
|
||||||
IncludeTopics []string `json:"includeTopics"`
|
IncludeTopics []string `json:"includeTopics"`
|
||||||
ExcludeTopics []string `json:"excludeTopics"`
|
ExcludeTopics []string `json:"excludeTopics"`
|
||||||
|
Syslog struct {
|
||||||
|
Enable string `json:"enable"`
|
||||||
|
Network string `json:"network"`
|
||||||
|
Server string `json:"server"`
|
||||||
|
Facility string `json:"facility"`
|
||||||
|
Severity string `json:"severity"`
|
||||||
|
Tag string `json:"tag"`
|
||||||
|
} `json:"syslog"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var Config ConfigT
|
var Config ConfigT
|
||||||
|
|
||||||
|
|
||||||
func LoadConfiguration() {
|
func LoadConfiguration() {
|
||||||
err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config)
|
err := json.Unmarshal([]byte(os.Getenv("MA_CONF")), &Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Unable to parse configuration: %s", err)
|
log.Fatalf("Unable to parse configuration: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load password from environment variable only if not set in config
|
||||||
|
if Config.Mqtt.Password == "" {
|
||||||
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD")
|
Config.Mqtt.Password = os.Getenv("MQTT_PASSWORD")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
// import "gorm.io/gorm"
|
|
||||||
|
|
||||||
|
|
||||||
type Message struct {
|
|
||||||
Time time.Time `gorm:"not null;primary_key"`
|
|
||||||
Topic string `gorm:"not null"`
|
|
||||||
Payload string `gorm:"not null"`
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
//"time"
|
|
||||||
//"fmt"
|
|
||||||
"ma/counter"
|
|
||||||
"gorm.io/driver/postgres"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DatabaseHandle struct {
|
|
||||||
initialized bool
|
|
||||||
dbh *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDatabaseHandle() *DatabaseHandle {
|
|
||||||
var db DatabaseHandle
|
|
||||||
// inject the whole database configuration via the well-known PG* env variables
|
|
||||||
conn, err := gorm.Open(postgres.Open(""))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Unable to open database connection: %s", err)
|
|
||||||
db.initialized = false
|
|
||||||
} else {
|
|
||||||
db.dbh = conn
|
|
||||||
db.initialized = true
|
|
||||||
//log.Println("Database connection opened")
|
|
||||||
}
|
|
||||||
return &db
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *DatabaseHandle) StoreMessage(message *Message) {
|
|
||||||
if ! self.initialized {
|
|
||||||
log.Printf("Database connection not initialized, can not store, message %s lost", message)
|
|
||||||
counter.F("Stored")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result := self.dbh.Create(message)
|
|
||||||
if result.Error != nil {
|
|
||||||
log.Printf("Unable to insert, message %s lost, error: %s", message, result.Error)
|
|
||||||
counter.F("Stored")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Println("Successfully stored message")
|
|
||||||
counter.S("Stored")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,22 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
//"time"
|
|
||||||
"gorm.io/driver/postgres"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Migrate() {
|
|
||||||
dsn := ""
|
|
||||||
db, err := gorm.Open(postgres.Open(dsn))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Unable to open database connection: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
db.AutoMigrate(&Message{})
|
|
||||||
log.Println("Message created")
|
|
||||||
|
|
||||||
log.Println("Remember to call create_hypertable on message, sowhat I can't do that for you.")
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
create extension if not exists timescaledb;
|
|
||||||
|
|
||||||
|
|
||||||
select create_hypertable('message', 'time');
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -5,20 +5,10 @@ go 1.21.3
|
|||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0
|
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
gorm.io/driver/postgres v1.5.10
|
|
||||||
gorm.io/gorm v1.25.12
|
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/gorilla/websocket v1.5.3 // indirect
|
github.com/gorilla/websocket v1.5.3 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5 // indirect
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
|
||||||
golang.org/x/crypto v0.25.0 // indirect
|
|
||||||
golang.org/x/net v0.27.0 // indirect
|
golang.org/x/net v0.27.0 // indirect
|
||||||
golang.org/x/sync v0.7.0 // indirect
|
golang.org/x/sync v0.7.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,44 +1,10 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
|
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
|
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
|
||||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
|
||||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
|
||||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
|
||||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
|
||||||
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
|
|
||||||
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
|
|
||||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
||||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
||||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
||||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
|
||||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE=
|
|
||||||
gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
|
|
||||||
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
|
|
||||||
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
|
|
||||||
|
|||||||
@@ -1,13 +1,17 @@
|
|||||||
package mqtt
|
package mqtt
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "strings"
|
"fmt"
|
||||||
import "fmt"
|
"log"
|
||||||
import MQTT "github.com/eclipse/paho.mqtt.golang"
|
"strings"
|
||||||
import "github.com/google/uuid"
|
|
||||||
import "crypto/tls"
|
"crypto/tls"
|
||||||
import "ma/config"
|
"ma/config"
|
||||||
import "ma/counter"
|
"ma/counter"
|
||||||
|
|
||||||
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Topic string
|
Topic string
|
||||||
@@ -22,7 +26,7 @@ var mqttClient MQTT.Client
|
|||||||
|
|
||||||
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
|
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
|
||||||
//log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
|
//log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
|
||||||
m := Message {
|
m := Message{
|
||||||
Topic: message.Topic(),
|
Topic: message.Topic(),
|
||||||
Payload: message.Payload(),
|
Payload: message.Payload(),
|
||||||
Retained: message.Retained(),
|
Retained: message.Retained(),
|
||||||
@@ -57,7 +61,6 @@ func onReconnecting(client MQTT.Client, clientOpts *MQTT.ClientOptions) {
|
|||||||
log.Println("Oops, connection lost, already reconnecting ...")
|
log.Println("Oops, connection lost, already reconnecting ...")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func onConnect(client MQTT.Client) {
|
func onConnect(client MQTT.Client) {
|
||||||
for _, topic := range config.Config.IncludeTopics {
|
for _, topic := range config.Config.IncludeTopics {
|
||||||
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
|
||||||
@@ -68,16 +71,13 @@ func onConnect(client MQTT.Client) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func outputDispatcher(client MQTT.Client) {
|
func outputDispatcher(client MQTT.Client) {
|
||||||
for {
|
for message := range OutputChannel {
|
||||||
select {
|
|
||||||
case message := <- OutputChannel:
|
|
||||||
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
|
log.Printf("Message arrived in outputDispatcher, topic: %s, payload: %s\n", message.Topic, message.Payload)
|
||||||
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
|
if token := client.Publish(message.Topic, 0, false, message.Payload); token.Wait() && token.Error() != nil {
|
||||||
log.Printf("Unable to publish, error %s", token.Error())
|
log.Printf("Unable to publish, error %s", token.Error())
|
||||||
}
|
}
|
||||||
log.Println("Successfully published")
|
log.Println("Successfully published")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartMqttClient() {
|
func StartMqttClient() {
|
||||||
@@ -111,7 +111,7 @@ func StartMqttClient() {
|
|||||||
enableTls := config.Config.Mqtt.TlsEnable
|
enableTls := config.Config.Mqtt.TlsEnable
|
||||||
if enableTls == "true" {
|
if enableTls == "true" {
|
||||||
//log.Println("Enabling TLS connection")
|
//log.Println("Enabling TLS connection")
|
||||||
tlsConfig := &tls.Config {
|
tlsConfig := &tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
}
|
}
|
||||||
opts.SetTLSConfig(tlsConfig)
|
opts.SetTLSConfig(tlsConfig)
|
||||||
@@ -122,7 +122,9 @@ func StartMqttClient() {
|
|||||||
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||||
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
|
log.Fatalf("Unable to connect to broker %s, error %s", broker, token.Error())
|
||||||
}
|
}
|
||||||
//log.Printf("Successfully connected to broker %s", broker)
|
log.Printf("Successfully connected to broker %s", broker)
|
||||||
|
log.Printf("Include topics: %s", config.Config.IncludeTopics)
|
||||||
|
log.Printf("Exclude topics: %s", config.Config.ExcludeTopics)
|
||||||
|
|
||||||
go outputDispatcher(mqttClient)
|
go outputDispatcher(mqttClient)
|
||||||
|
|
||||||
@@ -150,4 +152,3 @@ func TopicMatchesSubscription(topic, subscription string) bool {
|
|||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user