saerbeck query
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

This commit is contained in:
2024-11-11 12:44:02 +01:00
parent 3d68aa0e61
commit a55f80b7d9
2 changed files with 56 additions and 12 deletions

View File

@ -10,14 +10,34 @@ create or replace view badesee_temperature_v as
where application = 'de-hottis-saerbeck-monitoring' and where application = 'de-hottis-saerbeck-monitoring' and
device = 'eui-a84041318187ec13'; device = 'eui-a84041318187ec13';
create or replace view cubecell_threeway_temperature_v as create or replace view cubecell_threeway_temperature2_v as
select time, select time,
cast(values->'Temperature1'->>'value' as float) as Temp1, cast(values->'Temperature2'->>'value' as float) as value,
cast(values->'Temperature2'->>'value' as float) as Temp2, values->'Temperature2'->>'label' as label
cast(values->'Temperature3'->>'value' as float) as Temp3,
device
from measurements from measurements
where application = 'de-hottis-saerbeck-monitoring' and where application = 'de-hottis-saerbeck-monitoring' and
device = 'eui-70b3d57ed0068fa4'; device = 'eui-70b3d57ed0068fa4';
create or replace view cubecell_threeway_temperature1_v as
select time,
cast(values->'Temperature1'->>'value' as float) as value,
values->'Temperature1'->>'label' as label
from measurements
where application = 'de-hottis-saerbeck-monitoring' and
device = 'eui-70b3d57ed0068fa4';
create or replace view cubecell_threeway_temperature3_v as
select time,
cast(values->'Temperature3'->>'value' as float) as value,
values->'Temperature3'->>'label' as label
from measurements
where application = 'de-hottis-saerbeck-monitoring' and
device = 'eui-70b3d57ed0068fa4';
create or replace view cubecell_threeway_battery_v as
select time,
cast(values->'Battery'->>'value' as float) as value,
values->'Battery'->>'label' as label
from measurements
where application = 'de-hottis-saerbeck-monitoring' and
device = 'eui-70b3d57ed0068fa4';

View File

@ -79,16 +79,18 @@ func New(id string, config config.HandlerConfigT) handler.Handler {
return t return t
} }
func extractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) { func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) {
var res string var res string
switch selector[:2] { switch selector[:2] {
case "J:": case "J:":
// extract using jsonpath from payload
r, e := jp.Lookup(jPayload) r, e := jp.Lookup(jPayload)
if e != nil { if e != nil {
return "", fmt.Errorf("jp.Lookup failed with %s", e) return "", fmt.Errorf("jp.Lookup failed with %s", e)
} }
res = fmt.Sprint(r) res = fmt.Sprint(r)
case "T:": case "T:":
// T: extract from topic
i, e := strconv.Atoi(selector[2:]) i, e := strconv.Atoi(selector[2:])
if e != nil { if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e) return "", fmt.Errorf("Atoi failed with %s", e)
@ -97,7 +99,26 @@ func extractionHelper(subTopics []string, jPayload interface{}, selector string,
return "", fmt.Errorf("not enough subtopics") return "", fmt.Errorf("not enough subtopics")
} }
res = subTopics[i] res = subTopics[i]
case "L:":
// L: extract from topic and later match against devices table in database
i, e := strconv.Atoi(selector[2:])
if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e)
}
if i >= len(subTopics) {
return "", fmt.Errorf("not enough subtopics")
}
ext := subTopics[i]
lookup, err1b := self.dbh.GetDeviceByLabel(ext)
if err1b != nil {
log.Printf("ext lookup %s failed: %v", ext, err1b)
res = ext
} else {
log.Printf("ext: %s", lookup)
res = ext
}
case "C:": case "C:":
// use constant value
res = selector[2:] res = selector[2:]
default: default:
return "", fmt.Errorf("Invalid selector: %s", selector[:2]) return "", fmt.Errorf("Invalid selector: %s", selector[:2])
@ -111,14 +132,14 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
self.Lost("Handler is not marked as ready", nil, message) self.Lost("Handler is not marked as ready", nil, message)
return return
} }
//log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload) log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
measurement.Application = self.application measurement.Application = self.application
subTopics := strings.Split(message.Topic, "/") subTopics := strings.Split(message.Topic, "/")
//log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var jPayload interface{} var jPayload interface{}
err := json.Unmarshal([]byte(message.Payload), &jPayload) err := json.Unmarshal([]byte(message.Payload), &jPayload)
if err != nil { if err != nil {
@ -126,17 +147,20 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
return return
} }
device, err1 := extractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath) device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
if err1 != nil { if err1 != nil {
self.Lost("Device extraction failed", err1, message) self.Lost("Device extraction failed", err1, message)
return return
} }
value, err2 := extractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath) log.Printf("device: %s", device)
value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
if err2 != nil { if err2 != nil {
self.Lost("Value extraction failed", err2, message) self.Lost("Value extraction failed", err2, message)
return return
} }
unit, err3 := extractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
if err3 != nil { if err3 != nil {
self.Lost("Unit extraction failed", err3, message) self.Lost("Unit extraction failed", err3, message)
return return
@ -152,7 +176,7 @@ func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT
measurement.Values = make(map[string]database.VariableType) measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable measurement.Values["Value"] = variable
//log.Printf("Prepared measurement item: %s", measurement) log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement) self.dbh.StoreMeasurement(&measurement)
self.S() self.S()
} }