diff --git a/src/udi/database/abstract_database.go b/src/udi/database/abstract_database.go index 8d5048e..262a966 100644 --- a/src/udi/database/abstract_database.go +++ b/src/udi/database/abstract_database.go @@ -1,47 +1,38 @@ package database import "time" -import "gorm.io/gorm" - type VariableType struct { - Label string `json:"label"` - Variable string `json:"variable"` - Unit string `json:"unit"` - Value interface{} `json:"value,omitempty"` - Status string `json:"status,omitempty"` + Label string `json:"label"` + Variable string `json:"variable"` + Unit string `json:"unit"` + Value interface{} `json:"value,omitempty"` + Status string `json:"status,omitempty"` } type Measurement struct { - Time time.Time `gorm:"not null;primary_key"` - Application string `gorm:"not null"` - Device string - Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"` - Values map[string]VariableType `gorm:"serializer:json;type:jsonb"` + Time time.Time + Application string + Device string + Attributes map[string]interface{} + Values map[string]VariableType +} + +// Simplified structures for backward compatibility +type DeviceType struct { + Label string + ModelIdentifier string + Attributes map[string]interface{} } type Application struct { - gorm.Model - Label string `gorm:"not null"` - Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"` -} - - -type DeviceType struct { - gorm.Model - Label string `gorm:"not null"` - ModelIdentifier string - Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"` + Label string + Attributes map[string]interface{} } type Device struct { - gorm.Model - Label string `gorm:"not null;uniqueIndex:idx_label_application_id"` - ApplicationID int `gorm:"not null;uniqueIndex:idx_label_application_id"` - Application Application - DeviceTypeID int `gorm:"not null"` - DeviceType DeviceType - Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"` + Label string + Application Application + DeviceType DeviceType + Attributes map[string]interface{} } - - diff --git a/src/udi/database/database.go b/src/udi/database/database.go index 0634ace..fe5b0df 100644 --- a/src/udi/database/database.go +++ b/src/udi/database/database.go @@ -1,96 +1,157 @@ package database - import ( - "log" - //"time" - "fmt" - "udi/counter" - "gorm.io/driver/postgres" - "gorm.io/gorm" + "fmt" + "log" + "os" + "udi/counter" + + influxdb "github.com/influxdata/influxdb1-client/v2" ) type DatabaseHandle struct { - initialized bool - dbh *gorm.DB + initialized bool + client influxdb.Client + database string } func NewDatabaseHandle() *DatabaseHandle { - var db DatabaseHandle - // inject the whole database configuration via the well-known PG* env variables - conn, err := gorm.Open(postgres.Open("")) - if err != nil { - log.Printf("Unable to open database connection: %s", err) - db.initialized = false - } else { - db.dbh = conn - db.initialized = true - log.Println("Database connection opened") - } - return &db + var db DatabaseHandle + + // Read configuration from environment variables + influxURL := os.Getenv("INFLUXDB_URL") + if influxURL == "" { + influxURL = "http://localhost:8086" + } + + influxDB := os.Getenv("INFLUXDB_DATABASE") + if influxDB == "" { + influxDB = "udi" + } + + username := os.Getenv("INFLUXDB_USER") + password := os.Getenv("INFLUXDB_PASSWORD") + + // Create InfluxDB client + client, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: influxURL, + Username: username, + Password: password, + }) + + if err != nil { + log.Printf("Unable to create InfluxDB client: %s", err) + db.initialized = false + return &db + } + + // Test connection + _, _, err = client.Ping(0) + if err != nil { + log.Printf("Unable to ping InfluxDB: %s", err) + db.initialized = false + client.Close() + return &db + } + + db.client = client + db.database = influxDB + db.initialized = true + log.Printf("InfluxDB connection opened (URL: %s, Database: %s)", influxURL, influxDB) + + return &db } func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) { - if ! self.initialized { - log.Printf("Database connection not initialized, can not store, measurement %s lost", measurement) - counter.F("Stored") - return - } + if !self.initialized { + log.Printf("Database connection not initialized, can not store, measurement %v lost", measurement) + counter.F("Stored") + return + } - result := self.dbh.Create(measurement) - if result.Error != nil { - log.Printf("Unable to insert, measurement %s lost, error: %s", measurement, result.Error) - counter.F("Stored") - return - } + // Create batch points + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: self.database, + Precision: "s", + }) + if err != nil { + log.Printf("Unable to create batch points: %s", err) + counter.F("Stored") + return + } - log.Println("Successfully stored measurement") - counter.S("Stored") + // Build tags + tags := map[string]string{ + "application": measurement.Application, + } + if measurement.Device != "" { + tags["device"] = measurement.Device + } + + // Add attributes as tags + for key, value := range measurement.Attributes { + if strValue, ok := value.(string); ok { + tags[key] = strValue + } else { + tags[key] = fmt.Sprintf("%v", value) + } + } + + // Build fields from Values + fields := make(map[string]interface{}) + for key, varType := range measurement.Values { + // Store the value with the variable name as field key + fields[key] = varType.Value + + // Optionally store metadata as separate fields + if varType.Unit != "" { + fields[key+"_unit"] = varType.Unit + } + if varType.Variable != "" { + fields[key+"_variable"] = varType.Variable + } + if varType.Status != "" { + fields[key+"_status"] = varType.Status + } + } + + // Ensure we have at least one field + if len(fields) == 0 { + log.Printf("No fields to store in measurement, skipping") + counter.F("Stored") + return + } + + // Create point + pt, err := influxdb.NewPoint( + "measurement", + tags, + fields, + measurement.Time, + ) + if err != nil { + log.Printf("Unable to create point: %s", err) + counter.F("Stored") + return + } + + bp.AddPoint(pt) + + // Write batch + err = self.client.Write(bp) + if err != nil { + log.Printf("Unable to write to InfluxDB, measurement lost, error: %s", err) + counter.F("Stored") + return + } + + log.Println("Successfully stored measurement") + counter.S("Stored") } -func (self *DatabaseHandle) GetDeviceByLabelAndApplication(applicationLabel string, deviceLabel string) (*Device, error) { - if ! self.initialized { - err := fmt.Errorf("Database connection not initialized") - return nil, err - } - - var device Device - result := self.dbh. - Preload("Application"). - Preload("DeviceType"). - Joins("JOIN applications ON devices.application_id = applications.id"). - Where("devices.label = ? AND applications.label = ?", deviceLabel, applicationLabel). - First(&device) - - if result.Error != nil { - err := fmt.Errorf("Query failed: %s", result.Error) - return nil, err - } - - return &device, nil +func (self *DatabaseHandle) Close() { + if self.initialized && self.client != nil { + self.client.Close() + log.Println("InfluxDB connection closed") + } } - -func (self *DatabaseHandle) GetDeviceByLabel(deviceLabel string) (*Device, error) { - if ! self.initialized { - err := fmt.Errorf("Database connection not initialized") - return nil, err - } - - var device Device - result := self.dbh. - Preload("Application"). - Preload("DeviceType"). - Where("devices.label = ?", deviceLabel). - First(&device) - - if result.Error != nil { - err := fmt.Errorf("Query failed: %s", result.Error) - return nil, err - } - - return &device, nil -} - - - - diff --git a/src/udi/database/migrate_database.go b/src/udi/database/migrate_database.go deleted file mode 100644 index c93f4f5..0000000 --- a/src/udi/database/migrate_database.go +++ /dev/null @@ -1,55 +0,0 @@ -package database - -import ( - "log" - //"time" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -func Migrate() { - dsn := "" - db, err := gorm.Open(postgres.Open(dsn)) - if err != nil { - log.Fatalf("Unable to open database connection: %s", err) - } - - db.AutoMigrate(&Application{}) - log.Println("Application created") - - db.AutoMigrate(&DeviceType{}) - log.Println("DeviceType created") - - db.AutoMigrate(&Device{}) - log.Println("Device created") - - db.AutoMigrate(&Measurement{}) - log.Println("Measurement created") - - log.Println("Remember to call create_hypertable on measurements, sowhat I can't do that for you.") - - /* - m := Measurement { - Time: time.Now(), - Application: "app", - Attributes: nil, - Values: []SensorType { - { Variable: "Temperature", Unit: "Degree Celsius", Value: 1.0 }, - { Variable: "Temperature", Unit: "Degree Celsius", Value: 3.0 }, - }, - } - db.Create(&m) - - m = Measurement { - Time: time.Now(), - Application: "app", - Attributes: nil, - Values: []SensorType { - { Variable: "Temperature", Unit: "Degree Celsius", Value: 10.0 }, - { Variable: "Temperature", Unit: "Degree Celsius", Value: 30.0 }, - }, - } - db.Create(&m) - */ -} - diff --git a/src/udi/go.mod b/src/udi/go.mod index 1ef7d58..3be4466 100644 --- a/src/udi/go.mod +++ b/src/udi/go.mod @@ -5,21 +5,12 @@ go 1.22.3 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/google/uuid v1.6.0 + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 - gorm.io/driver/postgres v1.5.11 - gorm.io/gorm v1.25.12 ) require ( github.com/gorilla/websocket v1.5.3 // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.7.2 // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/jinzhu/inflection v1.0.0 // indirect - github.com/jinzhu/now v1.1.5 // indirect - golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/text v0.21.0 // indirect ) diff --git a/src/udi/migrate_schema.go b/src/udi/migrate_schema.go deleted file mode 100644 index 525fef0..0000000 --- a/src/udi/migrate_schema.go +++ /dev/null @@ -1,18 +0,0 @@ -package main - -import "log" -import "udi/database" - - - -func main() { - log.SetPrefix("UDI Migrate Schema: ") - log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) - - log.Println("Starting") - - database.Migrate() - - log.Println("Done") -} -