package dispatcher import "log" import "time" import "os" import "fmt" import "net/url" import "udi/mqtt" import "udi/config" import "udi/counter" import "udi/handlers/handler" import "udi/handlers/ttn" import "udi/handlers/iot" import "udi/handlers/pv" import "udi/handlers/mbgw3" import "udi/handlers/sver" import "udi/handlers/svej" import "udi/handlers/dt1t" import "udi/handlers/locative" import "udi/handlers/prepared" import "udi/handlers/z2m" var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100) func InitDispatcher() { log.Printf("Dispatcher initializing") go archiver() for _, mapping := range config.Config.TopicMappings { // log.Printf("Trying to initialize %s", mapping) var factory interface{} switch mapping.Handler { case "TTN": factory = ttn.New case "IoT": factory = iot.New case "PV": factory = pv.New case "MBGW3": factory = mbgw3.New case "SVER": factory = sver.New case "SVEJ": factory = svej.New case "DT1T": factory = dt1t.New case "Locative": factory = locative.New case "PREP": factory = prepared.New case "Z2M": factory = z2m.New default: factory = nil log.Printf("No handler %s found, ignore mapping", mapping.Handler) } fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler) if ! ok { log.Println("Typ Assertion failed") break } handler := fn(mapping.Id, mapping.Config) handlerMap[mapping.Id] = handler } //log.Printf("handlerMap: %s", handlerMap) } func storeMessage(filename string, item handler.MessageT) { file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) counter.F("Archived") return } defer file.Close() archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload) _, err = file.WriteString(string(archivingString) + "\n") if err != nil { log.Printf("Unable to write message, message is not archived: %s", err) counter.F("Archived") return } //log.Println("Successfully archived message") counter.S("Archived") } func archiver() { archivingRootDir := config.Config.Archiver.Dir var lastArchivingDir string for { select { case message := <- archiverChannel: currentDateStr := message.Timestamp.Format("2006/01/02/15") currentArchivingDir := archivingRootDir + "/" + currentDateStr if currentArchivingDir != lastArchivingDir { err := os.MkdirAll(currentArchivingDir, 0755) if err != nil { log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) counter.F("Archived") } lastArchivingDir = currentArchivingDir //log.Printf("Archiving dir %s created", currentArchivingDir) } archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) storeMessage(archivingFilename, message) } } } func InputDispatcher() { for { select { case mqttMessage := <- mqtt.InputChannel: //log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } archiverChannel <- message handleMessage(message) } } } func handleMessage(message handler.MessageT) { for _, mapping := range config.Config.TopicMappings { // log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler) for _, subscribedTopic := range mapping.Topics { // log.Printf("Testing %s in %s", message.Topic, subscribedTopic) if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { //log.Printf("Handle message in handler %s", mapping.Id) handler, exists := handlerMap[mapping.Id] if exists { handler.Handle(message) counter.S("Dispatched") return } else { log.Printf("Handler %s not found, message %s is lost", mapping.Id, message) counter.F("Dispatched") } } } } log.Printf("No match for topic %s, message %s is lost", message.Topic, message) counter.F("Dispatched") }