diff --git a/deployment/deploy-yml.tmpl b/deployment/deploy-yml.tmpl index 946bf18..ff8355b 100644 --- a/deployment/deploy-yml.tmpl +++ b/deployment/deploy-yml.tmpl @@ -21,6 +21,11 @@ spec: envFrom: - secretRef: name: locsrv-db-cred + env: + - name: MQTT_BROKER + value: mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883 + - name: MQTT_TOPIC + value: Locative/Event ports: - containerPort: 8080 protocol: TCP diff --git a/src/locsrv/main.go b/src/locsrv/main.go index d90434a..57c0e14 100644 --- a/src/locsrv/main.go +++ b/src/locsrv/main.go @@ -4,6 +4,7 @@ import ( "log" "github.com/gin-gonic/gin" "net/http" + "encoding/json" "locsrv/database" "locsrv/mqtt" ) @@ -24,10 +25,11 @@ import ( type locativeEvent struct { Trigger string `json:"trigger"` Device string `json:"device"` - Location string `json:"id"` + Id string `json:"id,omitempty"` + Location string `json:"location"` Latitude string `json:"latitude"` Longitude string `json:"longitude"` - Person string + Person string `json:"person"` } @@ -44,11 +46,18 @@ func main() { case event := <- ch: person, err := dbh.GetPersonById(event.Device) event.Person = person + event.Location = event.Id + event.Id = "" if err != nil { log.Printf("Person unknown: %v", err) } - log.Printf("Trigger: %s, Device: %s, Location: %s, Person: %s, Latitude: %s, Longitude: %s", event.Trigger, event.Device, event.Location, event.Person, event.Latitude, event.Longitude) - mqtt.Publish("bla") + message, err2 := json.Marshal(event) + if err2 != nil { + log.Printf("Unable to marshal event: %v", err2) + } else { + log.Printf("Message: %s", message) + mqtt.Publish(string(message)) + } } } }() diff --git a/src/locsrv/mqtt/mqtt.go b/src/locsrv/mqtt/mqtt.go new file mode 100644 index 0000000..e00bd8c --- /dev/null +++ b/src/locsrv/mqtt/mqtt.go @@ -0,0 +1,56 @@ +package mqtt + +import ( + "log" + "os" + "fmt" + MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" +) + + + +type MqttHandle struct { + initialized bool + pubTopic string + client MQTT.Client +} + +func New() *MqttHandle { + var mqttHandle MqttHandle + mqttHandle.initialized = true + + mqttOpts := MQTT.NewClientOptions(). + AddBroker(os.Getenv("MQTT_BROKER")). + SetClientID(fmt.Sprintf("locsrv-%s", uuid.New())). + SetConnectRetry(true) + mqttHandle.client= MQTT.NewClient(mqttOpts) + if token := mqttHandle.client.Connect(); token.Wait() && token.Error() != nil { + log.Printf("Unable to connect to broker, error %v", token.Error()) + mqttHandle.initialized = false + } + + mqttHandle.pubTopic = os.Getenv("MQTT_TOPIC") + if mqttHandle.pubTopic != "" { + log.Printf("No topic set") + mqttHandle.initialized = false + } + return &mqttHandle +} + +func (self *MqttHandle) Publish(message string) error { + if ! self.initialized { + return fmt.Errorf("MQTT connection not initialized") + } + + token := self.client.Publish(self.pubTopic, 0, false, message) + token.Wait() + if token.Error() != nil { + return fmt.Errorf("MQTT publish failed: %v", token.Error()) + } + + return nil +} + + +