109 lines
3.5 KiB
Go
Raw Normal View History

2023-11-27 13:09:41 +01:00
package dispatcher
import "log"
2023-11-28 10:23:21 +01:00
import "time"
import "os"
2023-11-28 16:53:51 +01:00
import "fmt"
import "net/url"
2023-11-27 13:09:41 +01:00
import "udi/mqtt"
import "udi/config"
2023-11-27 15:21:28 +01:00
import "udi/handlers/handler"
import "udi/handlers/ttn"
2023-11-28 16:53:51 +01:00
import "udi/handlers/iot"
2023-11-27 13:09:41 +01:00
2023-11-28 16:53:51 +01:00
type archivingStruct struct {
timestamp string `json:"timestamp"`
topic string `json:"topic"`
payload string `json:"payload"`
}
2023-11-27 13:09:41 +01:00
2023-11-27 15:21:28 +01:00
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
2023-11-27 21:52:51 +01:00
var archiverChannel chan mqtt.Message = make(chan mqtt.Message, 100)
2023-11-27 15:21:28 +01:00
func InitDispatcher() {
log.Printf("Initializing dispatcher")
2023-11-27 21:52:51 +01:00
go archiver()
2023-11-27 15:21:28 +01:00
for _, handlerEntry := range config.Config.Handlers {
log.Printf("Trying %s", handlerEntry.Name)
switch handlerEntry.Name {
case "TTN":
handlerMap[handlerEntry.Name] = ttn.NewTTNHandler()
log.Printf("TTN initialized")
2023-11-28 16:53:51 +01:00
case "IoT":
handlerMap[handlerEntry.Name] = iot.NewIoTHandler()
log.Printf("IoT initialized")
2023-11-27 15:21:28 +01:00
default:
log.Fatalf("Handler %s not found", handlerEntry.Name)
}
}
}
2023-11-28 23:08:07 +01:00
func storeMessage(filename string, item archivingStruct) {
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644)
2023-11-28 16:53:51 +01:00
if err != nil {
2023-11-28 23:08:07 +01:00
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
2023-11-28 16:53:51 +01:00
return
}
defer file.Close()
2023-11-28 23:08:07 +01:00
archivingString := fmt.Sprintf("%s - %s - %s\n", item.timestamp, item.topic, item.payload)
_, err = file.WriteString(string(archivingString) + "\n")
2023-11-28 16:53:51 +01:00
if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err)
return
}
2023-11-28 23:08:07 +01:00
log.Printf("Archiving message in file %s", filename)
2023-11-28 16:53:51 +01:00
}
2023-11-27 21:52:51 +01:00
func archiver() {
2023-11-28 10:23:21 +01:00
archivingRootDir := config.Config.Archiver.Dir
2023-11-28 16:53:51 +01:00
var lastArchivingDir string
2023-11-28 10:23:21 +01:00
2023-11-27 21:52:51 +01:00
for {
select {
2023-11-28 16:53:51 +01:00
case message := <- archiverChannel:
2023-11-28 10:23:21 +01:00
currentTime := time.Now()
2023-11-28 16:53:51 +01:00
currentDateStr := currentTime.Format("2006/01/02/15")
2023-11-28 10:23:21 +01:00
currentArchivingDir := archivingRootDir + "/" + currentDateStr
if currentArchivingDir != lastArchivingDir {
err := os.MkdirAll(currentArchivingDir, 0755)
2023-11-28 16:53:51 +01:00
if err != nil {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
2023-11-28 10:23:21 +01:00
}
lastArchivingDir = currentArchivingDir
2023-11-28 16:53:51 +01:00
log.Printf("Archiving dir %s created", currentArchivingDir)
2023-11-28 10:23:21 +01:00
}
2023-11-28 16:53:51 +01:00
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
archivingItem := archivingStruct { currentTime.Format("2006-01-02 15:04:05"), message.Topic, string(message.Payload) }
storeMessage(archivingFilename, archivingItem)
2023-11-27 21:52:51 +01:00
}
}
}
2023-11-27 13:09:41 +01:00
func InputDispatcher() {
for {
select {
case message := <- mqtt.InputChannel:
2023-11-27 15:41:27 +01:00
log.Printf("Message arrived in inputDispatcher, topic: %s\n", message.Topic)
2023-11-27 21:52:51 +01:00
archiverChannel <- message
2023-11-27 13:09:41 +01:00
for _, mapping := range config.Config.TopicMappings {
2023-11-27 15:21:28 +01:00
log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
2023-11-27 13:09:41 +01:00
for _, subscribedTopic := range mapping.Topics {
log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
2023-11-27 15:21:28 +01:00
log.Printf("Handle message in handler %s", mapping.Handler)
2023-11-27 15:41:27 +01:00
handler, exists := handlerMap[mapping.Handler]
if exists {
handler.Handle(message.Topic, string(message.Payload))
} else {
log.Printf("Handler not found, message is lost")
}
2023-11-27 13:09:41 +01:00
} else {
log.Printf("no match")
}
}
}
}
}
}