diff --git a/.gitignore b/.gitignore index 08a79db..84d8969 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ config.json src/udi/udi tmp/ +ENVDB diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index adcc76e..c3fd6a5 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -11,14 +11,9 @@ import "udi/handlers/handler" import "udi/handlers/ttn" import "udi/handlers/iot" -type archivingStruct struct { - timestamp string `json:"timestamp"` - topic string `json:"topic"` - payload string `json:"payload"` -} var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) -var archiverChannel chan mqtt.Message = make(chan mqtt.Message, 100) +var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100) func InitDispatcher() { log.Printf("Initializing dispatcher") @@ -38,14 +33,14 @@ func InitDispatcher() { } } -func storeMessage(filename string, item archivingStruct) { +func storeMessage(filename string, item handler.MessageT) { file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) return } defer file.Close() - archivingString := fmt.Sprintf("%s - %s - %s\n", item.timestamp, item.topic, item.payload) + archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload) _, err = file.WriteString(string(archivingString) + "\n") if err != nil { log.Printf("Unable to write message, message is not archived: %s", err) @@ -61,8 +56,7 @@ func archiver() { for { select { case message := <- archiverChannel: - currentTime := time.Now() - currentDateStr := currentTime.Format("2006/01/02/15") + currentDateStr := message.Timestamp.Format("2006/01/02/15") currentArchivingDir := archivingRootDir + "/" + currentDateStr if currentArchivingDir != lastArchivingDir { err := os.MkdirAll(currentArchivingDir, 0755) @@ -70,11 +64,10 @@ func archiver() { log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) } lastArchivingDir = currentArchivingDir - log.Printf("Archiving dir %s created", currentArchivingDir) + log.Printf("Archiving dir %s created", currentArchivingDir) } archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) - archivingItem := archivingStruct { currentTime.Format("2006-01-02 15:04:05"), message.Topic, string(message.Payload) } - storeMessage(archivingFilename, archivingItem) + storeMessage(archivingFilename, message) } } } @@ -82,8 +75,9 @@ func archiver() { func InputDispatcher() { for { select { - case message := <- mqtt.InputChannel: - log.Printf("Message arrived in inputDispatcher, topic: %s\n", message.Topic) + case mqttMessage := <- mqtt.InputChannel: + log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) + message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } archiverChannel <- message for _, mapping := range config.Config.TopicMappings { @@ -94,7 +88,7 @@ func InputDispatcher() { log.Printf("Handle message in handler %s", mapping.Handler) handler, exists := handlerMap[mapping.Handler] if exists { - handler.Handle(message.Topic, string(message.Payload)) + handler.Handle(message) } else { log.Printf("Handler not found, message is lost") } diff --git a/src/udi/go.mod b/src/udi/go.mod index a1ca57d..c6b2572 100644 --- a/src/udi/go.mod +++ b/src/udi/go.mod @@ -5,10 +5,19 @@ go 1.21.3 require ( github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/google/uuid v1.4.0 + gorm.io/driver/postgres v1.5.4 + gorm.io/gorm v1.25.5 ) require ( github.com/gorilla/websocket v1.5.0 // indirect - golang.org/x/net v0.8.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.4.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.10.0 // indirect golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.13.0 // indirect ) diff --git a/src/udi/go.sum b/src/udi/go.sum index d4629d8..0a0dc0d 100644 --- a/src/udi/go.sum +++ b/src/udi/go.sum @@ -1,10 +1,42 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= +github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo= +gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0= +gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= +gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= diff --git a/src/udi/handlers/handler/handler.go b/src/udi/handlers/handler/handler.go index 0859d83..ed4a9f3 100644 --- a/src/udi/handlers/handler/handler.go +++ b/src/udi/handlers/handler/handler.go @@ -1,6 +1,14 @@ package handler -type Handler interface { - Handle(string, string) +import "time" + +type MessageT struct { + Timestamp time.Time + Topic string + Payload string +} + +type Handler interface { + Handle(MessageT) } diff --git a/src/udi/handlers/iot/iot.go b/src/udi/handlers/iot/iot.go index 4b0f2f2..267d10f 100644 --- a/src/udi/handlers/iot/iot.go +++ b/src/udi/handlers/iot/iot.go @@ -1,6 +1,7 @@ package iot import "log" +import "udi/handlers/handler" var idSeq int = 0 @@ -16,8 +17,8 @@ func NewIoTHandler() *IoTHandler { return t } -func (self *IoTHandler) Handle(topic, payload string) { - log.Printf("Handler IoT %d processing %s -> %s", self.id, topic, payload) +func (self *IoTHandler) Handle(message handler.MessageT) { + log.Printf("Handler IoT %d processing %s -> %s", self.id, message.Topic, message.Payload) } diff --git a/src/udi/handlers/ttn/ttn.go b/src/udi/handlers/ttn/ttn.go index 0cab916..aa3ec54 100644 --- a/src/udi/handlers/ttn/ttn.go +++ b/src/udi/handlers/ttn/ttn.go @@ -1,6 +1,7 @@ package ttn import "log" +import "udi/handlers/handler" var idSeq int = 0 @@ -16,8 +17,8 @@ func NewTTNHandler() *TTNHandler { return t } -func (self *TTNHandler) Handle(topic, payload string) { - log.Printf("Handler TTN %d processing %s -> %s", self.id, topic, payload) +func (self *TTNHandler) Handle(message handler.MessageT) { + log.Printf("Handler TTN %d processing %s -> %s", self.id, message.Topic, message.Payload) } diff --git a/src/udi/main.go b/src/udi/main.go index 3719efb..1dc2573 100644 --- a/src/udi/main.go +++ b/src/udi/main.go @@ -1,11 +1,12 @@ package main import "log" -import "os" -import "os/signal" -import "udi/mqtt" +//import "os" +//import "os/signal" +//import "udi/mqtt" import "udi/config" -import "udi/dispatcher" +//import "udi/dispatcher" +import "udi/database" @@ -17,6 +18,7 @@ func main() { config.LoadConfiguration() + /* dispatcher.InitDispatcher() go dispatcher.InputDispatcher() @@ -30,5 +32,8 @@ func main() { <-c log.Println("Terminating UDI") + */ + + database.Test() }