mqtt stuff 3
This commit is contained in:
@ -21,6 +21,11 @@ spec:
|
||||
envFrom:
|
||||
- secretRef:
|
||||
name: locsrv-db-cred
|
||||
env:
|
||||
- name: MQTT_BROKER
|
||||
value: mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883
|
||||
- name: MQTT_TOPIC
|
||||
value: Locative/Event
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
protocol: TCP
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"log"
|
||||
"github.com/gin-gonic/gin"
|
||||
"net/http"
|
||||
"encoding/json"
|
||||
"locsrv/database"
|
||||
"locsrv/mqtt"
|
||||
)
|
||||
@ -24,10 +25,11 @@ import (
|
||||
type locativeEvent struct {
|
||||
Trigger string `json:"trigger"`
|
||||
Device string `json:"device"`
|
||||
Location string `json:"id"`
|
||||
Id string `json:"id,omitempty"`
|
||||
Location string `json:"location"`
|
||||
Latitude string `json:"latitude"`
|
||||
Longitude string `json:"longitude"`
|
||||
Person string
|
||||
Person string `json:"person"`
|
||||
}
|
||||
|
||||
|
||||
@ -44,11 +46,18 @@ func main() {
|
||||
case event := <- ch:
|
||||
person, err := dbh.GetPersonById(event.Device)
|
||||
event.Person = person
|
||||
event.Location = event.Id
|
||||
event.Id = ""
|
||||
if err != nil {
|
||||
log.Printf("Person unknown: %v", err)
|
||||
}
|
||||
log.Printf("Trigger: %s, Device: %s, Location: %s, Person: %s, Latitude: %s, Longitude: %s", event.Trigger, event.Device, event.Location, event.Person, event.Latitude, event.Longitude)
|
||||
mqtt.Publish("bla")
|
||||
message, err2 := json.Marshal(event)
|
||||
if err2 != nil {
|
||||
log.Printf("Unable to marshal event: %v", err2)
|
||||
} else {
|
||||
log.Printf("Message: %s", message)
|
||||
mqtt.Publish(string(message))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
56
src/locsrv/mqtt/mqtt.go
Normal file
56
src/locsrv/mqtt/mqtt.go
Normal file
@ -0,0 +1,56 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"fmt"
|
||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
|
||||
|
||||
type MqttHandle struct {
|
||||
initialized bool
|
||||
pubTopic string
|
||||
client MQTT.Client
|
||||
}
|
||||
|
||||
func New() *MqttHandle {
|
||||
var mqttHandle MqttHandle
|
||||
mqttHandle.initialized = true
|
||||
|
||||
mqttOpts := MQTT.NewClientOptions().
|
||||
AddBroker(os.Getenv("MQTT_BROKER")).
|
||||
SetClientID(fmt.Sprintf("locsrv-%s", uuid.New())).
|
||||
SetConnectRetry(true)
|
||||
mqttHandle.client= MQTT.NewClient(mqttOpts)
|
||||
if token := mqttHandle.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
log.Printf("Unable to connect to broker, error %v", token.Error())
|
||||
mqttHandle.initialized = false
|
||||
}
|
||||
|
||||
mqttHandle.pubTopic = os.Getenv("MQTT_TOPIC")
|
||||
if mqttHandle.pubTopic != "" {
|
||||
log.Printf("No topic set")
|
||||
mqttHandle.initialized = false
|
||||
}
|
||||
return &mqttHandle
|
||||
}
|
||||
|
||||
func (self *MqttHandle) Publish(message string) error {
|
||||
if ! self.initialized {
|
||||
return fmt.Errorf("MQTT connection not initialized")
|
||||
}
|
||||
|
||||
token := self.client.Publish(self.pubTopic, 0, false, message)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
return fmt.Errorf("MQTT publish failed: %v", token.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user