package dispatcher import "log" import "time" import "os" import "fmt" import "net/url" import "encoding/json" import "udi/mqtt" import "udi/config" import "udi/handlers/handler" import "udi/handlers/ttn" import "udi/handlers/iot" type archivingStruct struct { timestamp string `json:"timestamp"` topic string `json:"topic"` payload string `json:"payload"` } var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var archiverChannel chan mqtt.Message = make(chan mqtt.Message, 100) func InitDispatcher() { log.Printf("Initializing dispatcher") go archiver() for _, handlerEntry := range config.Config.Handlers { log.Printf("Trying %s", handlerEntry.Name) switch handlerEntry.Name { case "TTN": handlerMap[handlerEntry.Name] = ttn.NewTTNHandler() log.Printf("TTN initialized") case "IoT": handlerMap[handlerEntry.Name] = iot.NewIoTHandler() log.Printf("IoT initialized") default: log.Fatalf("Handler %s not found", handlerEntry.Name) } } } func storeMessage(filename, item string) { file, err := os.OpenFile(archivingFilename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { log.Printf("Unable to open archiving file %s, message is not archived: %s", archivingFilename, err) return } defer file.Close() archivingString, err := json.Marshal(archivingItem) if err != nil { log.Printf("Unable to convert to json, message is not archived: %s", err) return } _, err := file.WriteString(string(message.Payload)) if err != nil { log.Printf("Unable to write message, message is not archived: %s", err) return } log.Printf("Archiving message in file %s", archivingFilename) } func archiver() { archivingRootDir := config.Config.Archiver.Dir var lastArchivingDir string for { select { case message := <- archiverChannel: currentTime := time.Now() currentDateStr := currentTime.Format("2006/01/02/15") currentArchivingDir := archivingRootDir + "/" + currentDateStr if currentArchivingDir != lastArchivingDir { err := os.MkdirAll(currentArchivingDir, 0755) if err != nil { log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) } lastArchivingDir = currentArchivingDir log.Printf("Archiving dir %s created", currentArchivingDir) } archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) archivingItem := archivingStruct { currentTime.Format("2006-01-02 15:04:05"), message.Topic, string(message.Payload) } storeMessage(archivingFilename, archivingItem) } } } func InputDispatcher() { for { select { case message := <- mqtt.InputChannel: log.Printf("Message arrived in inputDispatcher, topic: %s\n", message.Topic) archiverChannel <- message for _, mapping := range config.Config.TopicMappings { log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler) for _, subscribedTopic := range mapping.Topics { log.Printf("Testing %s in %s", message.Topic, subscribedTopic) if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { log.Printf("Handle message in handler %s", mapping.Handler) handler, exists := handlerMap[mapping.Handler] if exists { handler.Handle(message.Topic, string(message.Payload)) } else { log.Printf("Handler not found, message is lost") } } else { log.Printf("no match") } } } } } }