diff --git a/deployment/load-config.sh b/deployment/load-config.sh new file mode 100755 index 0000000..042c45b --- /dev/null +++ b/deployment/load-config.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +FILE=$1 +if [ "$FILE" = "" ]; then + echo "give config file to load as first argument" + exit 1 +fi +SECRET_NAME=$2 +if [ "$SECRET_NAME" = "" ]; then + echo "give secret name to create/modify as second argument" + exit 1 +fi +NAMESPACE=$3 +if [ "$NAMESPACE" = "" ]; then + echo "give namespace as third argument" + exit 1 +fi + +kubectl create secret generic $SECRET_NAME \ + --from-literal=UDI_CONF="`cat $FILE`" \ + -n $NAMESPACE \ + --dry-run=client \ + -o yaml \ + --save-config | \ +kubectl apply -f - diff --git a/queries/hottis.sql b/queries/hottis.sql new file mode 100644 index 0000000..7c16602 --- /dev/null +++ b/queries/hottis.sql @@ -0,0 +1,25 @@ +create or replace view pv_power_v as + select time, + cast(values->'PowerActive'->>'value' as float) as power, + values->'Status'->>'status' as status, + device + from measurements + where application = 'PV'; + +create or replace view pv_total_import_v as + select time, + cast(values->'Power'->>'value' as float) as power, + device + from measurements + where application = 'Power' and + device = 'Total' and + attributes->>'Status' = 'Ok'; + +create or replace view power_v as + select time, + cast(values->'Power'->>'value' as float) as power, + device + from measurements + where application = 'Power' and + attributes->>'Status' = 'Ok'; + diff --git a/src/udi/dispatcher/dispatcher.go b/src/udi/dispatcher/dispatcher.go index b22e574..594e475 100644 --- a/src/udi/dispatcher/dispatcher.go +++ b/src/udi/dispatcher/dispatcher.go @@ -12,6 +12,7 @@ import "udi/handlers/ttn" import "udi/handlers/iot" import "udi/handlers/pv" import "udi/handlers/mbgw3" +import "udi/handlers/sve" var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) @@ -24,6 +25,7 @@ func InitDispatcher() { handlerMap["IoT"] = iot.NewIoTHandler() handlerMap["PV"] = pv.NewPvHandler(config.Config.Handlers["PV"]) handlerMap["MBGW3"] = mbgw3.NewMbgw3Handler(config.Config.Handlers["MBGW3"]) + handlerMap["SVE"] = sve.NewSveHandler(config.Config.Handlers["SVE"]) } func storeMessage(filename string, item handler.MessageT) { diff --git a/src/udi/handlers/sve/sve.go b/src/udi/handlers/sve/sve.go new file mode 100644 index 0000000..1f6e53e --- /dev/null +++ b/src/udi/handlers/sve/sve.go @@ -0,0 +1,209 @@ +package sve + +import ( + "log" + "time" + "strconv" + "strings" + "regexp" + "udi/config" + "udi/handlers/handler" + "udi/database" +) + +var idSeq int = 0 + +type SingleValueExtractorHandler struct { + id int + ready bool + config localConfig + payloadRegex *regexp.Regexp + dbh *database.DatabaseHandle +} + +const TOPIC_SEL = "topic" +const PAYLOAD_SEL = "payload" +const CONSTANT_SEL = "constant" + +type localConfig struct { + application string + deviceFrom string + devicePart int + device string + valueFrom string + valuePart int + unitFrom string + unitPart int + unit string +} + + +func NewSveHandler(config config.HandlerConfigT) *SingleValueExtractorHandler { + t := &SingleValueExtractorHandler { + id: idSeq, + ready: false, + } + idSeq += 1 + + var localConfig localConfig + if config.Attributes["application"] == "" { + log.Println("Error: application not configured") + return t + } + localConfig.application = config.Attributes["application"] + + payloadRegex := config.Attributes["payloadRegex"] + if payloadRegex != "" { + t.payloadRegex = regexp.MustCompile(payloadRegex) + } else { + t.payloadRegex = nil + } + + if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL { + log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"]) + return t + } + localConfig.deviceFrom = config.Attributes["deviceFrom"] + + devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"]) + if err1 != nil { + log.Printf("Error: unable to convert devicePart to number: %s", err1) + return t + } + localConfig.devicePart = devicePart + + // empty device is valid + localConfig.device = config.Attributes["device"] + + if config.Attributes["valueFrom"] != TOPIC_SEL && config.Attributes["valueFrom"] != PAYLOAD_SEL { + log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"]) + return t + } + localConfig.valueFrom = config.Attributes["valueFrom"] + + valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"]) + if err2 != nil { + log.Printf("Error: unable to convert valuePart to number: %s", err2) + return t + } + localConfig.valuePart = valuePart + + if config.Attributes["unitFrom"] != TOPIC_SEL && config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL { + log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"]) + return t + } + localConfig.unitFrom = config.Attributes["unitFrom"] + + unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"]) + if err3 != nil { + log.Printf("Error: unable to convert unitPart to number: %s", err3) + return t + } + localConfig.unitPart = unitPart + + // empty unit is valid + localConfig.unit = config.Attributes["unit"] + + t.config = localConfig + + t.ready = true + t.dbh = database.NewDatabaseHandle(config.DatabaseConnStr) + return t +} + +func lost(msg string, message handler.MessageT) { + log.Printf("Error: %s, message %s is lost", msg, message) +} + +func (self *SingleValueExtractorHandler) Handle(message handler.MessageT) { + if ! self.ready { + log.Println("Handler is not marked as ready, message %s is lost", message) + return + } + //log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload) + + var measurement database.Measurement + measurement.Time = time.Now() + measurement.Application = self.config.application + + subTopics := strings.Split(message.Topic, "/") + //log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) + + var payloadMatches []string + if self.payloadRegex != nil { + payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload) + //log.Printf("Matches: %s", strings.Join(payloadMatches, ", ")) + } + + switch self.config.deviceFrom { + case TOPIC_SEL: + if self.config.devicePart >= len(subTopics) { + lost("devicePart out of range", message) + return + } + measurement.Device = subTopics[self.config.devicePart] + case PAYLOAD_SEL: + if self.payloadRegex == nil { + lost("no payloadRegex defined, devicePart can't be used", message) + return + } + if self.config.devicePart >= len(subTopics) { + lost("devicePart out of range", message) + return + } + measurement.Device = payloadMatches[self.config.devicePart] + case CONSTANT_SEL: + measurement.Device = self.config.device + } + + measurement.Values = make(map[string]database.VariableType) + var variable database.VariableType + variable.Label = "" + variable.Variable = "" + + switch self.config.valueFrom { + case TOPIC_SEL: + if self.config.valuePart >= len(subTopics) { + lost("valuePart out of range", message) + return + } + variable.Value = subTopics[self.config.valuePart] + case PAYLOAD_SEL: + if self.payloadRegex == nil { + lost("no payloadRegex defined, valuePart can't be used", message) + return + } + if self.config.valuePart >= len(subTopics) { + lost("valuePart out of range", message) + return + } + variable.Value = payloadMatches[self.config.valuePart] + } + + switch self.config.unitFrom { + case TOPIC_SEL: + if self.config.unitPart >= len(subTopics) { + lost("unitPart out of range", message) + return + } + variable.Unit = subTopics[self.config.unitPart] + case PAYLOAD_SEL: + if self.payloadRegex == nil { + lost("no payloadRegex defined, unitPart can't be used", message) + return + } + if self.config.unitPart >= len(subTopics) { + lost("unitPart out of range", message) + return + } + variable.Unit = payloadMatches[self.config.unitPart] + case CONSTANT_SEL: + variable.Unit = self.config.unit + } + + measurement.Values["Value"] = variable + + //log.Printf("Prepared measurement item: %s", measurement) + self.dbh.StoreMeasurement(&measurement) +} +