Files
universal-data-ingest/src/udi/database/database.go

158 lines
3.3 KiB
Go

package database
import (
"fmt"
"log"
"os"
"udi/counter"
influxdb "github.com/influxdata/influxdb1-client/v2"
)
type DatabaseHandle struct {
initialized bool
client influxdb.Client
database string
}
func NewDatabaseHandle() *DatabaseHandle {
var db DatabaseHandle
// Read configuration from environment variables
influxURL := os.Getenv("INFLUXDB_URL")
if influxURL == "" {
influxURL = "http://localhost:8086"
}
influxDB := os.Getenv("INFLUXDB_DATABASE")
if influxDB == "" {
influxDB = "udi"
}
username := os.Getenv("INFLUXDB_USER")
password := os.Getenv("INFLUXDB_PASSWORD")
// Create InfluxDB client
client, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: influxURL,
Username: username,
Password: password,
})
if err != nil {
log.Printf("Unable to create InfluxDB client: %s", err)
db.initialized = false
return &db
}
// Test connection
_, _, err = client.Ping(0)
if err != nil {
log.Printf("Unable to ping InfluxDB: %s", err)
db.initialized = false
client.Close()
return &db
}
db.client = client
db.database = influxDB
db.initialized = true
log.Printf("InfluxDB connection opened (URL: %s, Database: %s)", influxURL, influxDB)
return &db
}
func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) {
if !self.initialized {
log.Printf("Database connection not initialized, can not store, measurement %v lost", measurement)
counter.F("Stored")
return
}
// Create batch points
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: self.database,
Precision: "s",
})
if err != nil {
log.Printf("Unable to create batch points: %s", err)
counter.F("Stored")
return
}
// Build tags
tags := map[string]string{
"application": measurement.Application,
}
if measurement.Device != "" {
tags["device"] = measurement.Device
}
// Add attributes as tags
for key, value := range measurement.Attributes {
if strValue, ok := value.(string); ok {
tags[key] = strValue
} else {
tags[key] = fmt.Sprintf("%v", value)
}
}
// Build fields from Values
fields := make(map[string]interface{})
for key, varType := range measurement.Values {
// Store the value with the variable name as field key
fields[key] = varType.Value
// Optionally store metadata as separate fields
if varType.Unit != "" {
fields[key+"_unit"] = varType.Unit
}
if varType.Variable != "" {
fields[key+"_variable"] = varType.Variable
}
if varType.Status != "" {
fields[key+"_status"] = varType.Status
}
}
// Ensure we have at least one field
if len(fields) == 0 {
log.Printf("No fields to store in measurement, skipping")
counter.F("Stored")
return
}
// Create point
pt, err := influxdb.NewPoint(
"observation",
tags,
fields,
measurement.Time,
)
if err != nil {
log.Printf("Unable to create point: %s", err)
counter.F("Stored")
return
}
bp.AddPoint(pt)
// Write batch
err = self.client.Write(bp)
if err != nil {
log.Printf("Unable to write to InfluxDB, measurement lost, error: %s", err)
counter.F("Stored")
return
}
log.Println("Successfully stored measurement")
counter.S("Stored")
}
func (self *DatabaseHandle) Close() {
if self.initialized && self.client != nil {
self.client.Close()
log.Println("InfluxDB connection closed")
}
}