changes for influxdb
This commit is contained in:
@@ -1,96 +1,157 @@
|
||||
package database
|
||||
|
||||
|
||||
import (
|
||||
"log"
|
||||
//"time"
|
||||
"fmt"
|
||||
"udi/counter"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"udi/counter"
|
||||
|
||||
influxdb "github.com/influxdata/influxdb1-client/v2"
|
||||
)
|
||||
|
||||
type DatabaseHandle struct {
|
||||
initialized bool
|
||||
dbh *gorm.DB
|
||||
initialized bool
|
||||
client influxdb.Client
|
||||
database string
|
||||
}
|
||||
|
||||
func NewDatabaseHandle() *DatabaseHandle {
|
||||
var db DatabaseHandle
|
||||
// inject the whole database configuration via the well-known PG* env variables
|
||||
conn, err := gorm.Open(postgres.Open(""))
|
||||
if err != nil {
|
||||
log.Printf("Unable to open database connection: %s", err)
|
||||
db.initialized = false
|
||||
} else {
|
||||
db.dbh = conn
|
||||
db.initialized = true
|
||||
log.Println("Database connection opened")
|
||||
}
|
||||
return &db
|
||||
var db DatabaseHandle
|
||||
|
||||
// Read configuration from environment variables
|
||||
influxURL := os.Getenv("INFLUXDB_URL")
|
||||
if influxURL == "" {
|
||||
influxURL = "http://localhost:8086"
|
||||
}
|
||||
|
||||
influxDB := os.Getenv("INFLUXDB_DATABASE")
|
||||
if influxDB == "" {
|
||||
influxDB = "udi"
|
||||
}
|
||||
|
||||
username := os.Getenv("INFLUXDB_USER")
|
||||
password := os.Getenv("INFLUXDB_PASSWORD")
|
||||
|
||||
// Create InfluxDB client
|
||||
client, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
|
||||
Addr: influxURL,
|
||||
Username: username,
|
||||
Password: password,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Unable to create InfluxDB client: %s", err)
|
||||
db.initialized = false
|
||||
return &db
|
||||
}
|
||||
|
||||
// Test connection
|
||||
_, _, err = client.Ping(0)
|
||||
if err != nil {
|
||||
log.Printf("Unable to ping InfluxDB: %s", err)
|
||||
db.initialized = false
|
||||
client.Close()
|
||||
return &db
|
||||
}
|
||||
|
||||
db.client = client
|
||||
db.database = influxDB
|
||||
db.initialized = true
|
||||
log.Printf("InfluxDB connection opened (URL: %s, Database: %s)", influxURL, influxDB)
|
||||
|
||||
return &db
|
||||
}
|
||||
|
||||
func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) {
|
||||
if ! self.initialized {
|
||||
log.Printf("Database connection not initialized, can not store, measurement %s lost", measurement)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
if !self.initialized {
|
||||
log.Printf("Database connection not initialized, can not store, measurement %v lost", measurement)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
|
||||
result := self.dbh.Create(measurement)
|
||||
if result.Error != nil {
|
||||
log.Printf("Unable to insert, measurement %s lost, error: %s", measurement, result.Error)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
// Create batch points
|
||||
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
|
||||
Database: self.database,
|
||||
Precision: "s",
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Unable to create batch points: %s", err)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Successfully stored measurement")
|
||||
counter.S("Stored")
|
||||
// Build tags
|
||||
tags := map[string]string{
|
||||
"application": measurement.Application,
|
||||
}
|
||||
if measurement.Device != "" {
|
||||
tags["device"] = measurement.Device
|
||||
}
|
||||
|
||||
// Add attributes as tags
|
||||
for key, value := range measurement.Attributes {
|
||||
if strValue, ok := value.(string); ok {
|
||||
tags[key] = strValue
|
||||
} else {
|
||||
tags[key] = fmt.Sprintf("%v", value)
|
||||
}
|
||||
}
|
||||
|
||||
// Build fields from Values
|
||||
fields := make(map[string]interface{})
|
||||
for key, varType := range measurement.Values {
|
||||
// Store the value with the variable name as field key
|
||||
fields[key] = varType.Value
|
||||
|
||||
// Optionally store metadata as separate fields
|
||||
if varType.Unit != "" {
|
||||
fields[key+"_unit"] = varType.Unit
|
||||
}
|
||||
if varType.Variable != "" {
|
||||
fields[key+"_variable"] = varType.Variable
|
||||
}
|
||||
if varType.Status != "" {
|
||||
fields[key+"_status"] = varType.Status
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure we have at least one field
|
||||
if len(fields) == 0 {
|
||||
log.Printf("No fields to store in measurement, skipping")
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
|
||||
// Create point
|
||||
pt, err := influxdb.NewPoint(
|
||||
"measurement",
|
||||
tags,
|
||||
fields,
|
||||
measurement.Time,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Unable to create point: %s", err)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
|
||||
bp.AddPoint(pt)
|
||||
|
||||
// Write batch
|
||||
err = self.client.Write(bp)
|
||||
if err != nil {
|
||||
log.Printf("Unable to write to InfluxDB, measurement lost, error: %s", err)
|
||||
counter.F("Stored")
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Successfully stored measurement")
|
||||
counter.S("Stored")
|
||||
}
|
||||
|
||||
func (self *DatabaseHandle) GetDeviceByLabelAndApplication(applicationLabel string, deviceLabel string) (*Device, error) {
|
||||
if ! self.initialized {
|
||||
err := fmt.Errorf("Database connection not initialized")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var device Device
|
||||
result := self.dbh.
|
||||
Preload("Application").
|
||||
Preload("DeviceType").
|
||||
Joins("JOIN applications ON devices.application_id = applications.id").
|
||||
Where("devices.label = ? AND applications.label = ?", deviceLabel, applicationLabel).
|
||||
First(&device)
|
||||
|
||||
if result.Error != nil {
|
||||
err := fmt.Errorf("Query failed: %s", result.Error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &device, nil
|
||||
func (self *DatabaseHandle) Close() {
|
||||
if self.initialized && self.client != nil {
|
||||
self.client.Close()
|
||||
log.Println("InfluxDB connection closed")
|
||||
}
|
||||
}
|
||||
|
||||
func (self *DatabaseHandle) GetDeviceByLabel(deviceLabel string) (*Device, error) {
|
||||
if ! self.initialized {
|
||||
err := fmt.Errorf("Database connection not initialized")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var device Device
|
||||
result := self.dbh.
|
||||
Preload("Application").
|
||||
Preload("DeviceType").
|
||||
Where("devices.label = ?", deviceLabel).
|
||||
First(&device)
|
||||
|
||||
if result.Error != nil {
|
||||
err := fmt.Errorf("Query failed: %s", result.Error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &device, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user