Merge branch 'main' of gitea.hottis.de:wn/universal-data-ingest

This commit is contained in:
2024-11-11 16:11:47 +01:00

View File

@ -79,16 +79,18 @@ func New(id string, config config.HandlerConfigT) handler.Handler {
return t
}
func extractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) {
func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) {
var res string
switch selector[:2] {
case "J:":
// extract using jsonpath from payload
r, e := jp.Lookup(jPayload)
if e != nil {
return "", fmt.Errorf("jp.Lookup failed with %s", e)
}
res = fmt.Sprint(r)
case "T:":
// T: extract from topic
i, e := strconv.Atoi(selector[2:])
if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e)
@ -97,7 +99,26 @@ func extractionHelper(subTopics []string, jPayload interface{}, selector string,
return "", fmt.Errorf("not enough subtopics")
}
res = subTopics[i]
case "L:":
// L: extract from topic and later match against devices table in database
i, e := strconv.Atoi(selector[2:])
if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e)
}
if i >= len(subTopics) {
return "", fmt.Errorf("not enough subtopics")
}
ext := subTopics[i]
lookup, err1b := self.dbh.GetDeviceByLabel(ext)
if err1b != nil {
log.Printf("ext lookup %s failed: %v", ext, err1b)
res = ext
} else {
log.Printf("ext: %s", lookup)
res = ext
}
case "C:":
// use constant value
res = selector[2:]
default:
return "", fmt.Errorf("Invalid selector: %s", selector[:2])
@ -111,14 +132,14 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
self.Lost("Handler is not marked as ready", nil, message)
return
}
//log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var measurement database.Measurement
measurement.Time = time.Now()
measurement.Application = self.application
subTopics := strings.Split(message.Topic, "/")
//log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var jPayload interface{}
err := json.Unmarshal([]byte(message.Payload), &jPayload)
if err != nil {
@ -126,17 +147,20 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
return
}
device, err1 := extractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
if err1 != nil {
self.Lost("Device extraction failed", err1, message)
return
}
value, err2 := extractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
log.Printf("device: %s", device)
value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
if err2 != nil {
self.Lost("Value extraction failed", err2, message)
return
}
unit, err3 := extractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
if err3 != nil {
self.Lost("Unit extraction failed", err3, message)
return
@ -152,7 +176,7 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable
//log.Printf("Prepared measurement item: %s", measurement)
log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
}