diff --git a/.gitignore b/.gitignore index 06a0374..08a79db 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ config.json src/udi/udi +tmp/ diff --git a/Dockerfile b/Dockerfile index e63dad8..41b59d5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,11 +8,7 @@ RUN go build -a -installsuffix nocgo -o udi . FROM scratch -ENV MQTT_BROKER "" -ENV MQTT_USERNAME "" -ENV MQTT_PASSWORD "" -ENV MQTT_ENABLE_TLS "" -ENV MQTT_SUBSCRIBE_TOPICS "" +ENV UDI_CONF "" COPY --from=builder /go/src/udi ./ ENTRYPOINT ["./udi"] diff --git a/src/udi/config/config.go b/src/udi/config/config.go index a82f09f..368837a 100644 --- a/src/udi/config/config.go +++ b/src/udi/config/config.go @@ -13,9 +13,9 @@ type ConfigT struct { TlsEnable string `json:"tlsEnable"` } `json:"mqtt"` TopicMappings []struct { - Topics []string `json:topics` - Handler string `json:handler` - } `json:"TopicMappings"` + Topics []string `json:"topics"` + Handler string `json:"handler"` + } `json:"topicMappings"` Handlers []struct { Name string `json:"name"` DatabaseConnStr string `json:"databaseConnStr"` diff --git a/src/udi/config/example.json b/src/udi/config/example.json new file mode 100644 index 0000000..c9638d4 --- /dev/null +++ b/src/udi/config/example.json @@ -0,0 +1,26 @@ +{ + "mqtt": { + "broker": "172.23.1.102:1883", + "username": "", + "password": "", + "tlsEnable": "false" + }, + "topicMappings": [ + { + "topics": ["IoT/MBGW3/Measurement"], + "handler": "IoT" + } + ], + "handlers": [ + { + "name": "IoT", + "databaseConnStr": "", + "attributes": { + } + } + ], + "archiver": { + "dir": "/mnt/udi/archive" + } +} + diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index 9fcc490..229621b 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -3,11 +3,20 @@ package dispatcher import "log" import "time" import "os" +import "fmt" +import "net/url" +import "encoding/json" import "udi/mqtt" import "udi/config" import "udi/handlers/handler" import "udi/handlers/ttn" +import "udi/handlers/iot" +type archivingStruct struct { + timestamp string `json:"timestamp"` + topic string `json:"topic"` + payload string `json:"payload"` +} var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var archiverChannel chan mqtt.Message = make(chan mqtt.Message, 100) @@ -21,32 +30,56 @@ func InitDispatcher() { case "TTN": handlerMap[handlerEntry.Name] = ttn.NewTTNHandler() log.Printf("TTN initialized") + case "IoT": + handlerMap[handlerEntry.Name] = iot.NewIoTHandler() + log.Printf("IoT initialized") default: log.Fatalf("Handler %s not found", handlerEntry.Name) } } } +func storeMessage(filename, item string) { + file, err := os.OpenFile(archivingFilename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) + if err != nil { + log.Printf("Unable to open archiving file %s, message is not archived: %s", archivingFilename, err) + return + } + defer file.Close() + archivingString, err := json.Marshal(archivingItem) + if err != nil { + log.Printf("Unable to convert to json, message is not archived: %s", err) + return + } + _, err := file.WriteString(string(message.Payload)) + if err != nil { + log.Printf("Unable to write message, message is not archived: %s", err) + return + } + log.Printf("Archiving message in file %s", archivingFilename) +} + func archiver() { archivingRootDir := config.Config.Archiver.Dir - currentArchivingDir := "" - lastArchivingDir := "" + var lastArchivingDir string for { select { - case _ = <- archiverChannel: + case message := <- archiverChannel: currentTime := time.Now() - currentDateStr := currentTime.Format("2006/01/02") + currentDateStr := currentTime.Format("2006/01/02/15") currentArchivingDir := archivingRootDir + "/" + currentDateStr if currentArchivingDir != lastArchivingDir { err := os.MkdirAll(currentArchivingDir, 0755) - if err := nil { - log.Printf("Unable to create archiving dir %s", currentArchivingDir) + if err != nil { + log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) } lastArchivingDir = currentArchivingDir + log.Printf("Archiving dir %s created", currentArchivingDir) } - archivingFilename := currentArchivingDir + "/" + string(currentTime.Hour()) + "/" - log.Printf("Archiving message") + archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) + archivingItem := archivingStruct { currentTime.Format("2006-01-02 15:04:05"), message.Topic, string(message.Payload) } + storeMessage(archivingFilename, archivingItem) } } } diff --git a/src/udi/handlers/iot/iot.go b/src/udi/handlers/iot/iot.go new file mode 100644 index 0000000..4b0f2f2 --- /dev/null +++ b/src/udi/handlers/iot/iot.go @@ -0,0 +1,23 @@ +package iot + +import "log" + +var idSeq int = 0 + +type IoTHandler struct { + id int +} + +func NewIoTHandler() *IoTHandler { + t := &IoTHandler { + id: idSeq, + } + idSeq += 1 + return t +} + +func (self *IoTHandler) Handle(topic, payload string) { + log.Printf("Handler IoT %d processing %s -> %s", self.id, topic, payload) +} + +