package sve import ( "log" "time" "strconv" "strings" "regexp" "fmt" "udi/config" "udi/handlers/handler" "udi/database" ) var idSeq int = 0 type SingleValueExtractorHandler struct { id int ready bool config localConfig payloadRegex *regexp.Regexp dbh *database.DatabaseHandle } const TOPIC_SEL = "topic" const PAYLOAD_SEL = "payload" const PAYLOAD_FULL_SEL = "payload-full" const CONSTANT_SEL = "constant" type localConfig struct { application string deviceFrom string devicePart int device string valueFrom string valuePart int unitFrom string unitPart int unit string } func NewSveHandler(config config.HandlerConfigT) handler.Handler { t := &SingleValueExtractorHandler { id: idSeq, ready: false, } idSeq += 1 var localConfig localConfig if config.Attributes["application"] == "" { log.Println("Error: application not configured") return t } localConfig.application = config.Attributes["application"] payloadRegex := config.Attributes["payloadRegex"] if payloadRegex != "" { t.payloadRegex = regexp.MustCompile(payloadRegex) } else { t.payloadRegex = nil } if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL { log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"]) return t } localConfig.deviceFrom = config.Attributes["deviceFrom"] devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"]) if err1 != nil { log.Printf("Error: unable to convert devicePart to number: %s", err1) return t } localConfig.devicePart = devicePart // empty device is valid localConfig.device = config.Attributes["device"] if config.Attributes["valueFrom"] != TOPIC_SEL && config.Attributes["valueFrom"] != PAYLOAD_SEL && config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL { log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"]) return t } localConfig.valueFrom = config.Attributes["valueFrom"] if config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL { valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"]) if err2 != nil { log.Printf("Error: unable to convert valuePart to number: %s", err2) return t } localConfig.valuePart = valuePart } if config.Attributes["unitFrom"] != TOPIC_SEL && config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL { log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"]) return t } localConfig.unitFrom = config.Attributes["unitFrom"] if config.Attributes["unitFrom"] != CONSTANT_SEL { unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"]) if err3 != nil { log.Printf("Error: unable to convert unitPart to number: %s", err3) return t } localConfig.unitPart = unitPart } // empty unit is valid localConfig.unit = config.Attributes["unit"] t.config = localConfig t.ready = true t.dbh = database.NewDatabaseHandle(config.DatabaseConnStr) return t } func (self *SingleValueExtractorHandler) GetId() string { return fmt.Sprintf("SVE%d", self.id) } func lost(msg string, message handler.MessageT) { log.Printf("Error: %s, message %s is lost", msg, message) } func (self *SingleValueExtractorHandler) Handle(message handler.MessageT) { if ! self.ready { log.Println("Handler is not marked as ready, message %s is lost", message) return } //log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload) var measurement database.Measurement measurement.Time = time.Now() measurement.Application = self.config.application subTopics := strings.Split(message.Topic, "/") //log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) var payloadMatches []string if self.payloadRegex != nil { payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload) //log.Printf("Matches: %s", strings.Join(payloadMatches, ", ")) } switch self.config.deviceFrom { case TOPIC_SEL: if self.config.devicePart >= len(subTopics) { lost("devicePart out of range", message) return } measurement.Device = subTopics[self.config.devicePart] case PAYLOAD_SEL: if self.payloadRegex == nil { lost("no payloadRegex defined, devicePart can't be used", message) return } if self.config.devicePart >= len(subTopics) { lost("devicePart out of range", message) return } measurement.Device = payloadMatches[self.config.devicePart] case CONSTANT_SEL: measurement.Device = self.config.device } measurement.Values = make(map[string]database.VariableType) var variable database.VariableType variable.Label = "" variable.Variable = "" switch self.config.valueFrom { case TOPIC_SEL: if self.config.valuePart >= len(subTopics) { lost("valuePart out of range", message) return } variable.Value = subTopics[self.config.valuePart] case PAYLOAD_SEL: if self.payloadRegex == nil { lost("no payloadRegex defined, valuePart can't be used", message) return } if self.config.valuePart >= len(subTopics) { lost("valuePart out of range", message) return } variable.Value = payloadMatches[self.config.valuePart] case PAYLOAD_FULL_SEL: variable.Value = message.Payload } switch self.config.unitFrom { case TOPIC_SEL: if self.config.unitPart >= len(subTopics) { lost("unitPart out of range", message) return } variable.Unit = subTopics[self.config.unitPart] case PAYLOAD_SEL: if self.payloadRegex == nil { lost("no payloadRegex defined, unitPart can't be used", message) return } if self.config.unitPart >= len(subTopics) { lost("unitPart out of range", message) return } variable.Unit = payloadMatches[self.config.unitPart] case CONSTANT_SEL: variable.Unit = self.config.unit } measurement.Values["Value"] = variable //log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) }