package svej import ( "log" "time" "strconv" "strings" "fmt" "github.com/oliveagle/jsonpath" "encoding/json" "udi/config" "udi/handlers/handler" "udi/database" ) var idSeq int = 0 type SingleValueExtractorJsonpathHandler struct { id int ready bool application string deviceSelector string valueSelector string unitSelector string deviceJsonpath *jsonpath.Compiled valueJsonpath *jsonpath.Compiled unitJsonpath *jsonpath.Compiled dbh *database.DatabaseHandle } /* Valid values for selectors: J:JsonpathExpression T:TopicPartIndex C:ConstantValue */ func NewSvejHandler(config config.HandlerConfigT) handler.Handler { t := &SingleValueExtractorJsonpathHandler { id: idSeq, ready: false, } idSeq += 1 if config.Attributes["application"] == "" { log.Println("Error: application not configured") return t } t.application = config.Attributes["application"] t.deviceSelector = config.Attributes["deviceSelector"] if t.deviceSelector[:2] == "J:" { jp, err := jsonpath.Compile(t.deviceSelector[2:]) if err != nil { log.Printf("Unable to compile deviceJsonpath: %s, %s", t.deviceSelector[2:], err) return t } t.deviceJsonpath = jp } t.valueSelector = config.Attributes["valueSelector"] if t.valueSelector[:2] == "J:" { jp, err := jsonpath.Compile(t.valueSelector[2:]) if err != nil { log.Printf("Unable to compile valueJsonpath: %s, %s", t.valueSelector[2:], err) return t } t.valueJsonpath = jp } t.unitSelector = config.Attributes["unitSelector"] if t.unitSelector[:2] == "J:" { jp, err := jsonpath.Compile(t.unitSelector[2:]) if err != nil { log.Printf("Unable to compile unitJsonpath: %s, %s", t.unitSelector[2:], err) return t } t.unitJsonpath = jp } t.ready = true t.dbh = database.NewDatabaseHandle() return t } func (self *SingleValueExtractorJsonpathHandler) GetId() string { return fmt.Sprintf("SVE%d", self.id) } func lost(msg string, message handler.MessageT) { log.Printf("Error: %s, message %s is lost", msg, message) } func extractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) { var res string switch selector[:2] { case "J:": r, e := jp.Lookup(jPayload) if e != nil { return "", fmt.Errorf("jp.Lookup failed with %s", e) } res = fmt.Sprint(r) case "T:": i, e := strconv.Atoi(selector[2:]) if e != nil { return "", fmt.Errorf("Atoi failed with %s", e) } if i >= len(subTopics) { return "", fmt.Errorf("not enough subtopics") } res = subTopics[i] case "C:": res = selector[2:] default: return "", fmt.Errorf("Invalid selector: %s", selector[:2]) } return res, nil } func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT) { if ! self.ready { log.Println("Handler is not marked as ready, message %s is lost", message) return } log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.id, message.Topic, message.Payload) var measurement database.Measurement measurement.Time = time.Now() measurement.Application = self.application subTopics := strings.Split(message.Topic, "/") //log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) var jPayload interface{} err := json.Unmarshal([]byte(message.Payload), &jPayload) if err != nil { lost(fmt.Sprintf("Unable to unmarshal payload: %s", err), message) return } device, err1 := extractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath) if err1 != nil { lost(fmt.Sprintf("Device extraction failed with %s", err1), message) return } value, err2 := extractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath) if err2 != nil { lost(fmt.Sprintf("Value extraction failed with %s", err2), message) return } unit, err3 := extractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath) if err3 != nil { lost(fmt.Sprintf("Unit extraction failed with %s", err3), message) return } measurement.Device = device var variable database.VariableType variable.Label = "" variable.Variable = "" variable.Unit = unit variable.Value = value measurement.Values = make(map[string]database.VariableType) measurement.Values["Value"] = variable log.Printf("Prepared measurement item: %s", measurement) self.dbh.StoreMeasurement(&measurement) }