partly works

This commit is contained in:
Wolfgang Hottgenroth 2024-12-01 16:38:25 +01:00
parent c659a89ec3
commit eea7c5c95b
Signed by: wn
GPG Key ID: 836E9E1192A6B132
4 changed files with 42 additions and 6 deletions

16
config-test.json Normal file
View File

@ -0,0 +1,16 @@
{
"mqtt": {
"broker": "mqtt://172.23.1.102:1883",
"username": "archiver",
"tlsEnable": "true"
},
"includeTopics": [
"#"
],
"excludeTopics": [
"IoT/Watchdog",
"snmp",
"MainsCnt/#",
"cem/#"
]
}

13
deployment/config.json Normal file
View File

@ -0,0 +1,13 @@
{
"mqtt": {
"broker": "mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883",
"tlsEnable": "false"
},
"includeTopics": [
"#"
],
"includeTopics": [
"mainscnt/#"
"cem/#"
]
}

View File

@ -26,7 +26,7 @@ func InputArchiver() {
} }
func handleMessage(message database.Message) { func handleMessage(message database.Message) {
log.Printf("Archiving %", message) log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
counter.S("Stored") counter.S("Stored")
} }

View File

@ -12,6 +12,7 @@ import "ma/counter"
type Message struct { type Message struct {
Topic string Topic string
Payload []byte Payload []byte
Retained bool
} }
var InputChannel chan Message = make(chan Message, 100) var InputChannel chan Message = make(chan Message, 100)
@ -20,24 +21,30 @@ var OutputChannel chan Message = make(chan Message, 100)
var mqttClient MQTT.Client var mqttClient MQTT.Client
func onMessageReceived(client MQTT.Client, message MQTT.Message) { func onMessageReceived(client MQTT.Client, message MQTT.Message) {
// log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) //log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
m := Message { m := Message {
Topic: message.Topic(), Topic: message.Topic(),
Payload: message.Payload(), Payload: message.Payload(),
Retained: message.Retained(),
}
if m.Retained {
counter.S("Skipped")
//log.Println("Retained message skipped")
return
} }
for _, i := range config.Config.ExcludeTopics { for _, i := range config.Config.ExcludeTopics {
if i == m.Topic { if TopicMatchesSubscription(m.Topic, i) {
counter.S("Skipped") counter.S("Skipped")
log.Println("Message skipped") //log.Println("Message skipped")
return return
} }
} }
select { select {
case InputChannel <- m: case InputChannel <- m:
counter.S("Received") counter.S("Received")
log.Println("Message sent to channel") //log.Println("Message sent to channel")
default: default:
log.Println("Channel full, message lost") //log.Println("Channel full, message lost")
counter.F("Received") counter.F("Received")
} }
} }