This commit is contained in:
2023-12-01 09:26:17 +01:00
parent 6faac5aeba
commit afbc3d3454
8 changed files with 80 additions and 29 deletions

View File

@ -11,14 +11,9 @@ import "udi/handlers/handler"
import "udi/handlers/ttn"
import "udi/handlers/iot"
type archivingStruct struct {
timestamp string `json:"timestamp"`
topic string `json:"topic"`
payload string `json:"payload"`
}
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
var archiverChannel chan mqtt.Message = make(chan mqtt.Message, 100)
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
func InitDispatcher() {
log.Printf("Initializing dispatcher")
@ -38,14 +33,14 @@ func InitDispatcher() {
}
}
func storeMessage(filename string, item archivingStruct) {
func storeMessage(filename string, item handler.MessageT) {
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644)
if err != nil {
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
return
}
defer file.Close()
archivingString := fmt.Sprintf("%s - %s - %s\n", item.timestamp, item.topic, item.payload)
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
_, err = file.WriteString(string(archivingString) + "\n")
if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err)
@ -61,8 +56,7 @@ func archiver() {
for {
select {
case message := <- archiverChannel:
currentTime := time.Now()
currentDateStr := currentTime.Format("2006/01/02/15")
currentDateStr := message.Timestamp.Format("2006/01/02/15")
currentArchivingDir := archivingRootDir + "/" + currentDateStr
if currentArchivingDir != lastArchivingDir {
err := os.MkdirAll(currentArchivingDir, 0755)
@ -70,11 +64,10 @@ func archiver() {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
}
lastArchivingDir = currentArchivingDir
log.Printf("Archiving dir %s created", currentArchivingDir)
log.Printf("Archiving dir %s created", currentArchivingDir)
}
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
archivingItem := archivingStruct { currentTime.Format("2006-01-02 15:04:05"), message.Topic, string(message.Payload) }
storeMessage(archivingFilename, archivingItem)
storeMessage(archivingFilename, message)
}
}
}
@ -82,8 +75,9 @@ func archiver() {
func InputDispatcher() {
for {
select {
case message := <- mqtt.InputChannel:
log.Printf("Message arrived in inputDispatcher, topic: %s\n", message.Topic)
case mqttMessage := <- mqtt.InputChannel:
log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
archiverChannel <- message
for _, mapping := range config.Config.TopicMappings {
@ -94,7 +88,7 @@ func InputDispatcher() {
log.Printf("Handle message in handler %s", mapping.Handler)
handler, exists := handlerMap[mapping.Handler]
if exists {
handler.Handle(message.Topic, string(message.Payload))
handler.Handle(message)
} else {
log.Printf("Handler not found, message is lost")
}