Compare commits

...

4 Commits

Author SHA1 Message Date
c37420a993 float fix 1
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:51:50 +01:00
ac0c417a48 small j for float 3
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:34:21 +01:00
dc965aeba6 small j for float 2
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:27:28 +01:00
b940f715c0 small j for float
Some checks failed
ci/woodpecker/tag/woodpecker Pipeline failed
2026-03-06 20:25:04 +01:00
3 changed files with 309 additions and 295 deletions

View File

@@ -105,6 +105,7 @@
"devicePart": "3",
"valueFrom": "payload",
"valuePart": "1",
"valueType": "float",
"unitFrom": "payload",
"unitPart": "3"
}

View File

@@ -1,29 +1,30 @@
package svej
import (
"log"
"time"
"strconv"
"strings"
"fmt"
"github.com/oliveagle/jsonpath"
"encoding/json"
"udi/config"
"udi/handlers/handler"
"udi/database"
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
"udi/config"
"udi/database"
"udi/handlers/handler"
"github.com/oliveagle/jsonpath"
)
type SingleValueExtractorJsonpathHandler struct {
handler.CommonHandler
ready bool
application string
deviceSelector string
valueSelector string
unitSelector string
deviceJsonpath *jsonpath.Compiled
valueJsonpath *jsonpath.Compiled
unitJsonpath *jsonpath.Compiled
dbh *database.DatabaseHandle
handler.CommonHandler
ready bool
application string
deviceSelector string
valueSelector string
unitSelector string
deviceJsonpath *jsonpath.Compiled
valueJsonpath *jsonpath.Compiled
unitJsonpath *jsonpath.Compiled
dbh *database.DatabaseHandle
}
/*
@@ -33,134 +34,131 @@ T:TopicPartIndex
C:ConstantValue
*/
func New(id string, config config.HandlerConfigT) handler.Handler {
t := &SingleValueExtractorJsonpathHandler {
ready: false,
}
t := &SingleValueExtractorJsonpathHandler{
ready: false,
}
if config.Attributes["application"] == "" {
log.Println("Error: application not configured")
return t
}
t.application = config.Attributes["application"]
if config.Attributes["application"] == "" {
log.Println("Error: application not configured")
return t
}
t.application = config.Attributes["application"]
t.deviceSelector = config.Attributes["deviceSelector"]
if t.deviceSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.deviceSelector[2:])
if err != nil {
log.Printf("Unable to compile deviceJsonpath: %s, %s", t.deviceSelector[2:], err)
return t
}
t.deviceJsonpath = jp
}
t.valueSelector = config.Attributes["valueSelector"]
if t.valueSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.valueSelector[2:])
if err != nil {
log.Printf("Unable to compile valueJsonpath: %s, %s", t.valueSelector[2:], err)
return t
}
t.valueJsonpath = jp
}
t.unitSelector = config.Attributes["unitSelector"]
if t.unitSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.unitSelector[2:])
if err != nil {
log.Printf("Unable to compile unitJsonpath: %s, %s", t.unitSelector[2:], err)
return t
}
t.unitJsonpath = jp
}
t.deviceSelector = config.Attributes["deviceSelector"]
if t.deviceSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.deviceSelector[2:])
if err != nil {
log.Printf("Unable to compile deviceJsonpath: %s, %s", t.deviceSelector[2:], err)
return t
}
t.deviceJsonpath = jp
}
t.valueSelector = config.Attributes["valueSelector"]
if t.valueSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.valueSelector[2:])
if err != nil {
log.Printf("Unable to compile valueJsonpath: %s, %s", t.valueSelector[2:], err)
return t
}
t.valueJsonpath = jp
}
t.unitSelector = config.Attributes["unitSelector"]
if t.unitSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.unitSelector[2:])
if err != nil {
log.Printf("Unable to compile unitJsonpath: %s, %s", t.unitSelector[2:], err)
return t
}
t.unitJsonpath = jp
}
t.Id = id
t.ready = true
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVEJ %d initialized", id)
return t
t.Id = id
t.ready = true
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVEJ %s initialized", id)
return t
}
func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) {
var res string
switch selector[:2] {
case "J:":
// extract using jsonpath from payload
r, e := jp.Lookup(jPayload)
if e != nil {
return "", fmt.Errorf("jp.Lookup failed with %s", e)
}
res = fmt.Sprint(r)
case "T:":
// T: extract from topic
i, e := strconv.Atoi(selector[2:])
if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e)
}
if i >= len(subTopics) {
return "", fmt.Errorf("not enough subtopics")
}
res = subTopics[i]
case "C:":
// use constant value
res = selector[2:]
default:
return "", fmt.Errorf("Invalid selector: %s", selector[:2])
}
return res, nil
func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (interface{}, error) {
var res interface{}
switch selector[:2] {
case "J:":
// extract using jsonpath from payload
r, e := jp.Lookup(jPayload)
if e != nil {
return "", fmt.Errorf("jp.Lookup failed with %s", e)
}
res = r
case "T:":
// T: extract from topic
i, e := strconv.Atoi(selector[2:])
if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e)
}
if i >= len(subTopics) {
return "", fmt.Errorf("not enough subtopics")
}
res = subTopics[i]
case "C:":
// use constant value
res = selector[2:]
default:
return "", fmt.Errorf("Invalid selector: %s", selector[:2])
}
return res, nil
}
func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT) {
if ! self.ready {
self.Lost("Handler is not marked as ready", nil, message)
return
}
log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
if !self.ready {
self.Lost("Handler is not marked as ready", nil, message)
return
}
log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var measurement database.Measurement
measurement.Time = time.Now()
measurement.Application = self.application
var measurement database.Measurement
measurement.Time = time.Now()
measurement.Application = self.application
subTopics := strings.Split(message.Topic, "/")
log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var jPayload interface{}
err := json.Unmarshal([]byte(message.Payload), &jPayload)
if err != nil {
self.Lost("Unable to unmarshal payload", err, message)
return
}
subTopics := strings.Split(message.Topic, "/")
log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var jPayload interface{}
err := json.Unmarshal([]byte(message.Payload), &jPayload)
if err != nil {
self.Lost("Unable to unmarshal payload", err, message)
return
}
device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
if err1 != nil {
self.Lost("Device extraction failed", err1, message)
return
}
log.Printf("device: %s", device)
device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
if err1 != nil {
self.Lost("Device extraction failed", err1, message)
return
}
log.Printf("device: %s", device)
value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
if err2 != nil {
self.Lost("Value extraction failed", err2, message)
return
}
value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
if err2 != nil {
self.Lost("Value extraction failed", err2, message)
return
}
unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
if err3 != nil {
self.Lost("Unit extraction failed", err3, message)
return
}
unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
if err3 != nil {
self.Lost("Unit extraction failed", err3, message)
return
}
measurement.Device = device
measurement.Device = device.(string)
var variable database.VariableType
variable.Label = ""
variable.Variable = ""
variable.Unit = unit
variable.Value = value
measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable
var variable database.VariableType
variable.Label = ""
variable.Variable = ""
variable.Unit = unit.(string)
variable.Value = value
measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable
log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
}

View File

@@ -1,23 +1,22 @@
package sver
import (
"time"
"strconv"
"strings"
"regexp"
"log"
"udi/config"
"udi/handlers/handler"
"udi/database"
"log"
"regexp"
"strconv"
"strings"
"time"
"udi/config"
"udi/database"
"udi/handlers/handler"
)
type SingleValueExtractorRegexHandler struct {
handler.CommonHandler
ready bool
config localConfig
payloadRegex *regexp.Regexp
dbh *database.DatabaseHandle
handler.CommonHandler
ready bool
config localConfig
payloadRegex *regexp.Regexp
dbh *database.DatabaseHandle
}
const TOPIC_SEL = "topic"
@@ -26,175 +25,191 @@ const PAYLOAD_FULL_SEL = "payload-full"
const CONSTANT_SEL = "constant"
type localConfig struct {
application string
deviceFrom string
devicePart int
device string
valueFrom string
valuePart int
unitFrom string
unitPart int
unit string
application string
deviceFrom string
devicePart int
device string
valueFrom string
valuePart int
valueType string
unitFrom string
unitPart int
unit string
}
func New(id string, config config.HandlerConfigT) handler.Handler {
t := &SingleValueExtractorRegexHandler {
ready: false,
}
t := &SingleValueExtractorRegexHandler{
ready: false,
}
var localConfig localConfig
if config.Attributes["application"] == "" {
log.Println("Error: application not configured")
return t
}
localConfig.application = config.Attributes["application"]
var localConfig localConfig
if config.Attributes["application"] == "" {
log.Println("Error: application not configured")
return t
}
localConfig.application = config.Attributes["application"]
payloadRegex := config.Attributes["payloadRegex"]
if payloadRegex != "" {
t.payloadRegex = regexp.MustCompile(payloadRegex)
} else {
t.payloadRegex = nil
}
payloadRegex := config.Attributes["payloadRegex"]
if payloadRegex != "" {
t.payloadRegex = regexp.MustCompile(payloadRegex)
} else {
t.payloadRegex = nil
}
if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL {
log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"])
return t
}
localConfig.deviceFrom = config.Attributes["deviceFrom"]
if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL {
log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"])
return t
}
localConfig.deviceFrom = config.Attributes["deviceFrom"]
devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"])
if err1 != nil {
log.Printf("Error: unable to convert devicePart to number: %s", err1)
return t
}
localConfig.devicePart = devicePart
devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"])
if err1 != nil {
log.Printf("Error: unable to convert devicePart to number: %s", err1)
return t
}
localConfig.devicePart = devicePart
// empty device is valid
localConfig.device = config.Attributes["device"]
// empty device is valid
localConfig.device = config.Attributes["device"]
if config.Attributes["valueFrom"] != PAYLOAD_SEL && config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL {
log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"])
return t
}
localConfig.valueFrom = config.Attributes["valueFrom"]
if config.Attributes["valueFrom"] != PAYLOAD_SEL && config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL {
log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"])
return t
}
localConfig.valueFrom = config.Attributes["valueFrom"]
if config.Attributes["valueFrom"] == PAYLOAD_SEL {
valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"])
if err2 != nil {
log.Printf("Error: unable to convert valuePart to number: %s", err2)
return t
}
localConfig.valuePart = valuePart
}
if config.Attributes["valueFrom"] == PAYLOAD_SEL {
valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"])
if err2 != nil {
log.Printf("Error: unable to convert valuePart to number: %s", err2)
return t
}
localConfig.valuePart = valuePart
}
if config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL {
log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"])
return t
}
localConfig.unitFrom = config.Attributes["unitFrom"]
if config.Attributes["valueType"] != "float" && config.Attributes["valueType"] != "string" {
log.Printf("Error: invalid value %s for valueType", config.Attributes["valueType"])
return t
}
localConfig.valueType = config.Attributes["valueType"]
if config.Attributes["unitFrom"] == PAYLOAD_SEL {
unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"])
if err3 != nil {
log.Printf("Error: unable to convert unitPart to number: %s", err3)
return t
}
localConfig.unitPart = unitPart
}
if config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL {
log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"])
return t
}
localConfig.unitFrom = config.Attributes["unitFrom"]
// empty unit is valid
localConfig.unit = config.Attributes["unit"]
if config.Attributes["unitFrom"] == PAYLOAD_SEL {
unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"])
if err3 != nil {
log.Printf("Error: unable to convert unitPart to number: %s", err3)
return t
}
localConfig.unitPart = unitPart
}
t.config = localConfig
// empty unit is valid
localConfig.unit = config.Attributes["unit"]
t.Id = id
t.ready = true
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVER %d initialized", id)
return t
t.config = localConfig
t.Id = id
t.ready = true
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVER %d initialized", id)
return t
}
func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) {
if ! self.ready {
self.Lost("Handler is not marked as ready", nil, message)
return
}
//log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload)
if !self.ready {
self.Lost("Handler is not marked as ready", nil, message)
return
}
//log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload)
var measurement database.Measurement
measurement.Time = time.Now()
measurement.Application = self.config.application
var measurement database.Measurement
measurement.Time = time.Now()
measurement.Application = self.config.application
subTopics := strings.Split(message.Topic, "/")
//log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
subTopics := strings.Split(message.Topic, "/")
//log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var payloadMatches []string
if self.payloadRegex != nil {
payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload)
//log.Printf("Matches: %s", strings.Join(payloadMatches, ", "))
}
var payloadMatches []string
if self.payloadRegex != nil {
payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload)
//log.Printf("Matches: %s", strings.Join(payloadMatches, ", "))
}
switch self.config.deviceFrom {
case TOPIC_SEL:
if self.config.devicePart >= len(subTopics) {
self.Lost("devicePart out of range", nil, message)
return
}
measurement.Device = subTopics[self.config.devicePart]
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, devicePart can't be used", nil, message)
return
}
if self.config.devicePart >= len(payloadMatches) {
self.Lost("devicePart out of range", nil, message)
return
}
measurement.Device = payloadMatches[self.config.devicePart]
case CONSTANT_SEL:
measurement.Device = self.config.device
}
switch self.config.deviceFrom {
case TOPIC_SEL:
if self.config.devicePart >= len(subTopics) {
self.Lost("devicePart out of range", nil, message)
return
}
measurement.Device = subTopics[self.config.devicePart]
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, devicePart can't be used", nil, message)
return
}
if self.config.devicePart >= len(payloadMatches) {
self.Lost("devicePart out of range", nil, message)
return
}
measurement.Device = payloadMatches[self.config.devicePart]
case CONSTANT_SEL:
measurement.Device = self.config.device
}
measurement.Values = make(map[string]database.VariableType)
var variable database.VariableType
variable.Label = ""
variable.Variable = ""
measurement.Values = make(map[string]database.VariableType)
var variable database.VariableType
variable.Label = ""
variable.Variable = ""
switch self.config.valueFrom {
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, valuePart can't be used", nil, message)
return
}
if self.config.valuePart >= len(payloadMatches) {
self.Lost("valuePart out of range", nil, message)
return
}
variable.Value = payloadMatches[self.config.valuePart]
case PAYLOAD_FULL_SEL:
variable.Value = message.Payload
}
var value string
switch self.config.valueFrom {
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, valuePart can't be used", nil, message)
return
}
if self.config.valuePart >= len(payloadMatches) {
self.Lost("valuePart out of range", nil, message)
return
}
value = payloadMatches[self.config.valuePart]
case PAYLOAD_FULL_SEL:
value = message.Payload
}
if self.config.valueType == "float" {
fValue, err := strconv.ParseFloat(value, 64)
if err != nil {
self.Lost("Unable to convert value to float", err, message)
return
}
variable.Value = fValue
} else {
variable.Value = value
}
switch self.config.unitFrom {
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, unitPart can't be used", nil, message)
return
}
if self.config.unitPart >= len(payloadMatches) {
self.Lost("unitPart out of range", nil, message)
return
}
variable.Unit = payloadMatches[self.config.unitPart]
case CONSTANT_SEL:
variable.Unit = self.config.unit
}
switch self.config.unitFrom {
case PAYLOAD_SEL:
if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, unitPart can't be used", nil, message)
return
}
if self.config.unitPart >= len(payloadMatches) {
self.Lost("unitPart out of range", nil, message)
return
}
variable.Unit = payloadMatches[self.config.unitPart]
case CONSTANT_SEL:
variable.Unit = self.config.unit
}
measurement.Values["Value"] = variable
measurement.Values["Value"] = variable
//log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
//log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
}