This commit is contained in:
Wolfgang Hottgenroth 2024-12-01 16:04:59 +01:00
parent e241319e6e
commit c659a89ec3
Signed by: wn
GPG Key ID: 836E9E1192A6B132
6 changed files with 20 additions and 43 deletions

6
.gitignore vendored
View File

@ -1,6 +1,6 @@
src/udi/udi src/ma/ma
src/udi/main src/ma/main
src/udi/migrate_schema src/ma/migrate_schema
tmp/ tmp/
ENVDB ENVDB
ENVDB.cluster ENVDB.cluster

View File

@ -2,11 +2,11 @@ package archiver
import "log" import "log"
import "time" import "time"
import "os" //import "os"
import "fmt" //import "fmt"
import "net/url" //import "net/url"
import "ma/mqtt" import "ma/mqtt"
import "ma/config" //import "ma/config"
import "ma/counter" import "ma/counter"
import "ma/database" import "ma/database"
@ -20,13 +20,12 @@ func InputArchiver() {
select { select {
case mqttMessage := <- mqtt.InputChannel: case mqttMessage := <- mqtt.InputChannel:
message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
archiverChannel <- message
handleMessage(message) handleMessage(message)
} }
} }
} }
func handleMessage(message database.MessageT) { func handleMessage(message database.Message) {
log.Printf("Archiving %", message) log.Printf("Archiving %", message)
counter.S("Stored") counter.S("Stored")
} }

View File

@ -45,32 +45,11 @@ func F(id string) {
} }
} }
func SH(id string) {
if _, ok := stats.Handled[id]; ok {
tuple := stats.Handled[id]
tuple.Successful += 1
stats.Handled[id] = tuple
} else {
stats.Handled[id] = statsTuple_t { Successful:1, Failed:0, }
}
}
func FH(id string) {
if _, ok := stats.Handled[id]; ok {
tuple := stats.Handled[id]
tuple.Failed += 1
stats.Handled[id] = tuple
} else {
stats.Handled[id] = statsTuple_t { Successful:0, Failed:1, }
}
}
func InitCounter() { func InitCounter() {
stats = stats_t { stats = stats_t {
Received: statsTuple_t {Successful:0,Failed:0,}, Received: statsTuple_t {Successful:0,Failed:0,},
Stored: statsTuple_t {Successful:0,Failed:0,}, Stored: statsTuple_t {Successful:0,Failed:0,},
Skipped: statsTuple_t {Successful:0,Failed:0,}, Skipped: statsTuple_t {Successful:0,Failed:0,},
Handled: make(map[string]statsTuple_t),
} }
go func() { go func() {

View File

@ -1,7 +1,7 @@
package database package database
import "time" import "time"
import "gorm.io/gorm" // import "gorm.io/gorm"
type Message struct { type Message struct {

View File

@ -4,7 +4,7 @@ package database
import ( import (
"log" "log"
//"time" //"time"
"fmt" //"fmt"
"ma/counter" "ma/counter"
"gorm.io/driver/postgres" "gorm.io/driver/postgres"
"gorm.io/gorm" "gorm.io/gorm"

View File

@ -6,8 +6,8 @@ import "fmt"
import MQTT "github.com/eclipse/paho.mqtt.golang" import MQTT "github.com/eclipse/paho.mqtt.golang"
import "github.com/google/uuid" import "github.com/google/uuid"
import "crypto/tls" import "crypto/tls"
import "udi/config" import "ma/config"
import "udi/counter" import "ma/counter"
type Message struct { type Message struct {
Topic string Topic string
@ -21,22 +21,21 @@ var mqttClient MQTT.Client
func onMessageReceived(client MQTT.Client, message MQTT.Message) { func onMessageReceived(client MQTT.Client, message MQTT.Message) {
// log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload()) // log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
for _, i := range config.Config.ExcludeTopics {
if i == message.Topic {
counter.S("Skipped")
return
}
}
m := Message { m := Message {
Topic: message.Topic(), Topic: message.Topic(),
Payload: message.Payload(), Payload: message.Payload(),
} }
for _, i := range config.Config.ExcludeTopics {
if i == m.Topic {
counter.S("Skipped")
log.Println("Message skipped")
return
}
}
select { select {
case InputChannel <- m: case InputChannel <- m:
counter.S("Received") counter.S("Received")
{} log.Println("Message sent to channel")
//log.Println("Message sent to channel")
default: default:
log.Println("Channel full, message lost") log.Println("Channel full, message lost")
counter.F("Received") counter.F("Received")