first handler writing to database
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,4 +1,5 @@
|
||||
config.json
|
||||
config-*.json
|
||||
src/udi/udi
|
||||
src/udi/migrate_schema
|
||||
tmp/
|
||||
ENVDB
|
||||
|
@ -3,15 +3,20 @@ package database
|
||||
import "time"
|
||||
import "gorm.io/gorm"
|
||||
|
||||
|
||||
type VariableType struct {
|
||||
Label string `json:"label"`
|
||||
Variable string `json:"variable"`
|
||||
Unit string `json:"unit"`
|
||||
Value interface{} `json:"value,omitempty"`
|
||||
}
|
||||
|
||||
type Measurement struct {
|
||||
Time time.Time `gorm:"not null;primary_key"`
|
||||
Application string `gorm:"not null"`
|
||||
SensorType string `gorm:"not null"`
|
||||
Sensor string `gorm:"not null"`
|
||||
Variable string `gorm:"not null"`
|
||||
Unit string
|
||||
Value float32 `gorm:"not null"`
|
||||
Device string
|
||||
Attributes map[string]string `gorm:"serializer:json;type:jsonb"`
|
||||
Values map[string]VariableType `gorm:"serializer:json;type:jsonb"`
|
||||
}
|
||||
|
||||
type Application struct {
|
||||
@ -20,21 +25,21 @@ type Application struct {
|
||||
Attributes map[string]string `gorm:"serializer:json;type:jsonb"`
|
||||
}
|
||||
|
||||
type SensorType struct {
|
||||
|
||||
type DeviceType struct {
|
||||
gorm.Model
|
||||
Label string `gorm:"not null"`
|
||||
Variable string `gorm:"not null"`
|
||||
Unit string
|
||||
ModelIdentifier string
|
||||
Attributes map[string]string `gorm:"serializer:json;type:jsonb"`
|
||||
}
|
||||
|
||||
type Sensor struct {
|
||||
type Device struct {
|
||||
gorm.Model
|
||||
Label string `gorm:"not null"`
|
||||
ApplicationID int `gorm:"not null"`
|
||||
Application Application
|
||||
SensorTypeID int `gorm:"not null"`
|
||||
SensorType SensorType
|
||||
DeviceTypeID int `gorm:"not null"`
|
||||
DeviceType DeviceType
|
||||
Attributes map[string]string `gorm:"serializer:json;type:jsonb"`
|
||||
}
|
||||
|
||||
|
@ -1,28 +1,45 @@
|
||||
package database
|
||||
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
//"time"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func Test() {
|
||||
dsn := ""
|
||||
db, err := gorm.Open(postgres.Open(dsn))
|
||||
log.Printf("Database: %s, %s", db, err)
|
||||
|
||||
db.AutoMigrate(&Application{})
|
||||
db.AutoMigrate(&SensorType{})
|
||||
db.AutoMigrate(&Sensor{})
|
||||
db.AutoMigrate(&Measurement{})
|
||||
|
||||
n := time.Now()
|
||||
m := Measurement { Time: n, Application: "bla", SensorType: "bla", Sensor: "bla", Variable: "bla", Unit: "bla", Value: 1.0 }
|
||||
db.Create(&m)
|
||||
m = Measurement { Time: n, Application: "bla", SensorType: "bla", Sensor: "bla", Variable: "bla", Unit: "bla", Value: 1.0 }
|
||||
db.Create(&m)
|
||||
|
||||
|
||||
type DatabaseHandle struct {
|
||||
initialized bool
|
||||
dbh *gorm.DB
|
||||
}
|
||||
|
||||
func NewDatabaseHandle(dsn string) *DatabaseHandle {
|
||||
var db DatabaseHandle
|
||||
conn, err := gorm.Open(postgres.Open(dsn))
|
||||
if err != nil {
|
||||
log.Printf("Unable to open database connection: %s", err)
|
||||
db.initialized = false
|
||||
} else {
|
||||
db.dbh = conn
|
||||
db.initialized = true
|
||||
log.Println("Database connection opened")
|
||||
}
|
||||
return &db
|
||||
}
|
||||
|
||||
func (dbh *DatabaseHandle) StoreMeasurement(measurement *Measurement) {
|
||||
if ! dbh.initialized {
|
||||
log.Println("Database connection not initialized, can not store, measurement lost")
|
||||
return
|
||||
}
|
||||
|
||||
result := dbh.dbh.Create(measurement)
|
||||
if result.Error != nil {
|
||||
log.Printf("Unable to insert measurement: %s", result.Error)
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Successfully stored measurement")
|
||||
}
|
||||
|
||||
|
||||
|
55
src/udi/database/migrate_database.go
Normal file
55
src/udi/database/migrate_database.go
Normal file
@ -0,0 +1,55 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"log"
|
||||
//"time"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func Migrate() {
|
||||
dsn := ""
|
||||
db, err := gorm.Open(postgres.Open(dsn))
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to open database connection: %s", err)
|
||||
}
|
||||
|
||||
db.AutoMigrate(&Application{})
|
||||
log.Println("Application created")
|
||||
|
||||
db.AutoMigrate(&DeviceType{})
|
||||
log.Println("DeviceType created")
|
||||
|
||||
db.AutoMigrate(&Device{})
|
||||
log.Println("Device created")
|
||||
|
||||
db.AutoMigrate(&Measurement{})
|
||||
log.Println("Measurement created")
|
||||
|
||||
log.Println("Remember to call create_hypertable on measurements, sowhat I can't do that for you.")
|
||||
|
||||
/*
|
||||
m := Measurement {
|
||||
Time: time.Now(),
|
||||
Application: "app",
|
||||
Attributes: nil,
|
||||
Values: []SensorType {
|
||||
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 1.0 },
|
||||
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 3.0 },
|
||||
},
|
||||
}
|
||||
db.Create(&m)
|
||||
|
||||
m = Measurement {
|
||||
Time: time.Now(),
|
||||
Application: "app",
|
||||
Attributes: nil,
|
||||
Values: []SensorType {
|
||||
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 10.0 },
|
||||
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 30.0 },
|
||||
},
|
||||
}
|
||||
db.Create(&m)
|
||||
*/
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import "udi/config"
|
||||
import "udi/handlers/handler"
|
||||
import "udi/handlers/ttn"
|
||||
import "udi/handlers/iot"
|
||||
import "udi/handlers/pv"
|
||||
|
||||
|
||||
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
|
||||
@ -27,6 +28,9 @@ func InitDispatcher() {
|
||||
case "IoT":
|
||||
handlerMap[handlerEntry.Name] = iot.NewIoTHandler()
|
||||
log.Printf("IoT initialized")
|
||||
case "PV":
|
||||
handlerMap[handlerEntry.Name] = pv.NewPvHandler()
|
||||
log.Printf("PV initialized")
|
||||
default:
|
||||
log.Fatalf("Handler %s not found", handlerEntry.Name)
|
||||
}
|
||||
|
85
src/udi/handlers/pv/pv.go
Normal file
85
src/udi/handlers/pv/pv.go
Normal file
@ -0,0 +1,85 @@
|
||||
package pv
|
||||
|
||||
import (
|
||||
"log"
|
||||
"reflect"
|
||||
"time"
|
||||
"encoding/json"
|
||||
"udi/handlers/handler"
|
||||
"udi/database"
|
||||
)
|
||||
|
||||
|
||||
var idSeq int = 0
|
||||
|
||||
type PvHandler struct {
|
||||
id int
|
||||
dbh *database.DatabaseHandle
|
||||
}
|
||||
|
||||
type PvValue struct {
|
||||
ImportEnergyActive float32 `unit:"Wh" json:"importEnergyActive"`
|
||||
ImportEnergyReactive float32 `unit:"VAh" json:"importEnergyReactive"`
|
||||
ExportEnergyActive float32 `unit:"Wh" json:"exportEnergyActive"`
|
||||
ExportEnergyReactive float32 `unit:"VAh" json:"exportEnergyiReactive"`
|
||||
PowerApparent float32 `unit:"VA" json:"powerApparent"`
|
||||
PowerActive float32 `unit:"W" json:"powerActive"`
|
||||
PowerReactive float32 `unit:"VA" json:"powerReactive"`
|
||||
PowerDemandPositive float32 `unit:"W" json:"powerDemandPositive"`
|
||||
PowerDemandReverse float32 `unit:"W" json:"powerDemandReverse"`
|
||||
Factor float32 `unit:"" json:"factor"`
|
||||
Angle float32 `unit:"degree" json:"angle"`
|
||||
Voltage float32 `unit:"V" json:"voltage"`
|
||||
Current float32 `unit:"A" json:"current"`
|
||||
State int `unit:"" json:"state"`
|
||||
Status string `unit:"" json:"status"`
|
||||
Timestamp string `unit:"" json:"timestamp"`
|
||||
Cnt int `unit:"" json:"cnt"`
|
||||
}
|
||||
|
||||
|
||||
func NewPvHandler() *PvHandler {
|
||||
t := &PvHandler {
|
||||
id: idSeq,
|
||||
}
|
||||
idSeq += 1
|
||||
|
||||
t.dbh = database.NewDatabaseHandle("")
|
||||
return t
|
||||
}
|
||||
|
||||
func (self *PvHandler) Handle(message handler.MessageT) {
|
||||
log.Printf("Handler PV %d processing %s -> %s", self.id, message.Topic, message.Payload)
|
||||
|
||||
var pvValue PvValue
|
||||
err := json.Unmarshal([]byte(message.Payload), &pvValue)
|
||||
if err != nil {
|
||||
log.Printf("Enable to parse payload into pvValue struct, values are lost: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
variables := make(map[string]database.VariableType)
|
||||
pvValueStructValue := reflect.ValueOf(pvValue)
|
||||
for i := 0; i < pvValueStructValue.NumField(); i++ {
|
||||
field := pvValueStructValue.Type().Field(i)
|
||||
fieldValue := pvValueStructValue.Field(i)
|
||||
v := database.VariableType {
|
||||
Label: "",
|
||||
Variable: field.Name,
|
||||
Unit: field.Tag.Get("unit"),
|
||||
Value: fieldValue.Interface(),
|
||||
}
|
||||
variables[field.Name] = v
|
||||
}
|
||||
|
||||
measurement := database.Measurement {
|
||||
Time: time.Now(),
|
||||
Application: "PV",
|
||||
Device: "Powermeter",
|
||||
Values: variables,
|
||||
}
|
||||
|
||||
self.dbh.StoreMeasurement(&measurement)
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,11 @@
|
||||
package main
|
||||
|
||||
import "log"
|
||||
//import "os"
|
||||
//import "os/signal"
|
||||
//import "udi/mqtt"
|
||||
import "os"
|
||||
import "os/signal"
|
||||
import "udi/mqtt"
|
||||
import "udi/config"
|
||||
//import "udi/dispatcher"
|
||||
import "udi/database"
|
||||
import "udi/dispatcher"
|
||||
|
||||
|
||||
|
||||
@ -18,7 +17,6 @@ func main() {
|
||||
|
||||
config.LoadConfiguration()
|
||||
|
||||
/*
|
||||
dispatcher.InitDispatcher()
|
||||
go dispatcher.InputDispatcher()
|
||||
|
||||
@ -32,8 +30,5 @@ func main() {
|
||||
<-c
|
||||
|
||||
log.Println("Terminating UDI")
|
||||
*/
|
||||
|
||||
database.Test()
|
||||
}
|
||||
|
||||
|
18
src/udi/migrate_schema.go
Normal file
18
src/udi/migrate_schema.go
Normal file
@ -0,0 +1,18 @@
|
||||
package main
|
||||
|
||||
import "log"
|
||||
import "udi/database"
|
||||
|
||||
|
||||
|
||||
func main() {
|
||||
log.SetPrefix("UDI Migrate Schema: ")
|
||||
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||||
|
||||
log.Println("Starting")
|
||||
|
||||
database.Migrate()
|
||||
|
||||
log.Println("Done")
|
||||
}
|
||||
|
Reference in New Issue
Block a user