diff --git a/.gitignore b/.gitignore index 84d8969..0eeb6e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ -config.json +config-*.json src/udi/udi +src/udi/migrate_schema tmp/ ENVDB diff --git a/src/udi/database/abstract_database.go b/src/udi/database/abstract_database.go index 0bf347a..3afffb5 100644 --- a/src/udi/database/abstract_database.go +++ b/src/udi/database/abstract_database.go @@ -3,15 +3,20 @@ package database import "time" import "gorm.io/gorm" + +type VariableType struct { + Label string `json:"label"` + Variable string `json:"variable"` + Unit string `json:"unit"` + Value interface{} `json:"value,omitempty"` +} + type Measurement struct { Time time.Time `gorm:"not null;primary_key"` Application string `gorm:"not null"` - SensorType string `gorm:"not null"` - Sensor string `gorm:"not null"` - Variable string `gorm:"not null"` - Unit string - Value float32 `gorm:"not null"` + Device string Attributes map[string]string `gorm:"serializer:json;type:jsonb"` + Values map[string]VariableType `gorm:"serializer:json;type:jsonb"` } type Application struct { @@ -20,21 +25,21 @@ type Application struct { Attributes map[string]string `gorm:"serializer:json;type:jsonb"` } -type SensorType struct { + +type DeviceType struct { gorm.Model Label string `gorm:"not null"` - Variable string `gorm:"not null"` - Unit string + ModelIdentifier string Attributes map[string]string `gorm:"serializer:json;type:jsonb"` } -type Sensor struct { +type Device struct { gorm.Model Label string `gorm:"not null"` ApplicationID int `gorm:"not null"` Application Application - SensorTypeID int `gorm:"not null"` - SensorType SensorType + DeviceTypeID int `gorm:"not null"` + DeviceType DeviceType Attributes map[string]string `gorm:"serializer:json;type:jsonb"` } diff --git a/src/udi/database/database.go b/src/udi/database/database.go index e8cf7c5..3f647ca 100644 --- a/src/udi/database/database.go +++ b/src/udi/database/database.go @@ -1,28 +1,45 @@ package database + import ( "log" - "time" + //"time" "gorm.io/driver/postgres" "gorm.io/gorm" ) -func Test() { - dsn := "" - db, err := gorm.Open(postgres.Open(dsn)) - log.Printf("Database: %s, %s", db, err) - - db.AutoMigrate(&Application{}) - db.AutoMigrate(&SensorType{}) - db.AutoMigrate(&Sensor{}) - db.AutoMigrate(&Measurement{}) - - n := time.Now() - m := Measurement { Time: n, Application: "bla", SensorType: "bla", Sensor: "bla", Variable: "bla", Unit: "bla", Value: 1.0 } - db.Create(&m) - m = Measurement { Time: n, Application: "bla", SensorType: "bla", Sensor: "bla", Variable: "bla", Unit: "bla", Value: 1.0 } - db.Create(&m) - - +type DatabaseHandle struct { + initialized bool + dbh *gorm.DB } +func NewDatabaseHandle(dsn string) *DatabaseHandle { + var db DatabaseHandle + conn, err := gorm.Open(postgres.Open(dsn)) + if err != nil { + log.Printf("Unable to open database connection: %s", err) + db.initialized = false + } else { + db.dbh = conn + db.initialized = true + log.Println("Database connection opened") + } + return &db +} + +func (dbh *DatabaseHandle) StoreMeasurement(measurement *Measurement) { + if ! dbh.initialized { + log.Println("Database connection not initialized, can not store, measurement lost") + return + } + + result := dbh.dbh.Create(measurement) + if result.Error != nil { + log.Printf("Unable to insert measurement: %s", result.Error) + return + } + + log.Println("Successfully stored measurement") +} + + diff --git a/src/udi/database/migrate_database.go b/src/udi/database/migrate_database.go new file mode 100644 index 0000000..c93f4f5 --- /dev/null +++ b/src/udi/database/migrate_database.go @@ -0,0 +1,55 @@ +package database + +import ( + "log" + //"time" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func Migrate() { + dsn := "" + db, err := gorm.Open(postgres.Open(dsn)) + if err != nil { + log.Fatalf("Unable to open database connection: %s", err) + } + + db.AutoMigrate(&Application{}) + log.Println("Application created") + + db.AutoMigrate(&DeviceType{}) + log.Println("DeviceType created") + + db.AutoMigrate(&Device{}) + log.Println("Device created") + + db.AutoMigrate(&Measurement{}) + log.Println("Measurement created") + + log.Println("Remember to call create_hypertable on measurements, sowhat I can't do that for you.") + + /* + m := Measurement { + Time: time.Now(), + Application: "app", + Attributes: nil, + Values: []SensorType { + { Variable: "Temperature", Unit: "Degree Celsius", Value: 1.0 }, + { Variable: "Temperature", Unit: "Degree Celsius", Value: 3.0 }, + }, + } + db.Create(&m) + + m = Measurement { + Time: time.Now(), + Application: "app", + Attributes: nil, + Values: []SensorType { + { Variable: "Temperature", Unit: "Degree Celsius", Value: 10.0 }, + { Variable: "Temperature", Unit: "Degree Celsius", Value: 30.0 }, + }, + } + db.Create(&m) + */ +} + diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index c3fd6a5..da09814 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -10,6 +10,7 @@ import "udi/config" import "udi/handlers/handler" import "udi/handlers/ttn" import "udi/handlers/iot" +import "udi/handlers/pv" var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) @@ -27,6 +28,9 @@ func InitDispatcher() { case "IoT": handlerMap[handlerEntry.Name] = iot.NewIoTHandler() log.Printf("IoT initialized") + case "PV": + handlerMap[handlerEntry.Name] = pv.NewPvHandler() + log.Printf("PV initialized") default: log.Fatalf("Handler %s not found", handlerEntry.Name) } diff --git a/src/udi/handlers/pv/pv.go b/src/udi/handlers/pv/pv.go new file mode 100644 index 0000000..3a2f3e0 --- /dev/null +++ b/src/udi/handlers/pv/pv.go @@ -0,0 +1,85 @@ +package pv + +import ( + "log" + "reflect" + "time" + "encoding/json" + "udi/handlers/handler" + "udi/database" +) + + +var idSeq int = 0 + +type PvHandler struct { + id int + dbh *database.DatabaseHandle +} + +type PvValue struct { + ImportEnergyActive float32 `unit:"Wh" json:"importEnergyActive"` + ImportEnergyReactive float32 `unit:"VAh" json:"importEnergyReactive"` + ExportEnergyActive float32 `unit:"Wh" json:"exportEnergyActive"` + ExportEnergyReactive float32 `unit:"VAh" json:"exportEnergyiReactive"` + PowerApparent float32 `unit:"VA" json:"powerApparent"` + PowerActive float32 `unit:"W" json:"powerActive"` + PowerReactive float32 `unit:"VA" json:"powerReactive"` + PowerDemandPositive float32 `unit:"W" json:"powerDemandPositive"` + PowerDemandReverse float32 `unit:"W" json:"powerDemandReverse"` + Factor float32 `unit:"" json:"factor"` + Angle float32 `unit:"degree" json:"angle"` + Voltage float32 `unit:"V" json:"voltage"` + Current float32 `unit:"A" json:"current"` + State int `unit:"" json:"state"` + Status string `unit:"" json:"status"` + Timestamp string `unit:"" json:"timestamp"` + Cnt int `unit:"" json:"cnt"` +} + + +func NewPvHandler() *PvHandler { + t := &PvHandler { + id: idSeq, + } + idSeq += 1 + + t.dbh = database.NewDatabaseHandle("") + return t +} + +func (self *PvHandler) Handle(message handler.MessageT) { + log.Printf("Handler PV %d processing %s -> %s", self.id, message.Topic, message.Payload) + + var pvValue PvValue + err := json.Unmarshal([]byte(message.Payload), &pvValue) + if err != nil { + log.Printf("Enable to parse payload into pvValue struct, values are lost: ", err) + return + } + + variables := make(map[string]database.VariableType) + pvValueStructValue := reflect.ValueOf(pvValue) + for i := 0; i < pvValueStructValue.NumField(); i++ { + field := pvValueStructValue.Type().Field(i) + fieldValue := pvValueStructValue.Field(i) + v := database.VariableType { + Label: "", + Variable: field.Name, + Unit: field.Tag.Get("unit"), + Value: fieldValue.Interface(), + } + variables[field.Name] = v + } + + measurement := database.Measurement { + Time: time.Now(), + Application: "PV", + Device: "Powermeter", + Values: variables, + } + + self.dbh.StoreMeasurement(&measurement) +} + + diff --git a/src/udi/main.go b/src/udi/main.go index 1dc2573..3719efb 100644 --- a/src/udi/main.go +++ b/src/udi/main.go @@ -1,12 +1,11 @@ package main import "log" -//import "os" -//import "os/signal" -//import "udi/mqtt" +import "os" +import "os/signal" +import "udi/mqtt" import "udi/config" -//import "udi/dispatcher" -import "udi/database" +import "udi/dispatcher" @@ -18,7 +17,6 @@ func main() { config.LoadConfiguration() - /* dispatcher.InitDispatcher() go dispatcher.InputDispatcher() @@ -32,8 +30,5 @@ func main() { <-c log.Println("Terminating UDI") - */ - - database.Test() } diff --git a/src/udi/migrate_schema.go b/src/udi/migrate_schema.go new file mode 100644 index 0000000..525fef0 --- /dev/null +++ b/src/udi/migrate_schema.go @@ -0,0 +1,18 @@ +package main + +import "log" +import "udi/database" + + + +func main() { + log.SetPrefix("UDI Migrate Schema: ") + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + log.Println("Starting") + + database.Migrate() + + log.Println("Done") +} +