Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
6be68d4dee
|
|||
54eeb17e9b
|
|||
4a56fea33b
|
|||
0215d2efd1
|
|||
dd394877f3
|
|||
9624d5d53d
|
|||
7fab124dd0
|
|||
97e9463d84
|
|||
82cb14d076
|
@ -21,7 +21,51 @@ spec:
|
||||
envFrom:
|
||||
- secretRef:
|
||||
name: locsrv-db-cred
|
||||
env:
|
||||
- name: MQTT_BROKER
|
||||
value: mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883
|
||||
- name: MQTT_TOPIC
|
||||
value: locative/event
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
protocol: TCP
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: locative
|
||||
spec:
|
||||
type: ClusterIP
|
||||
selector:
|
||||
app: locsrv
|
||||
ports:
|
||||
- name: http
|
||||
protocol: TCP
|
||||
port: 8080
|
||||
targetPort: 8080
|
||||
|
||||
---
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: locative
|
||||
annotations:
|
||||
cert-manager.io/cluster-issuer: letsencrypt-production-http
|
||||
spec:
|
||||
tls:
|
||||
- hosts:
|
||||
- locative.hottis.de
|
||||
secretName: locative-cert
|
||||
rules:
|
||||
- host: locative.hottis.de
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: http
|
||||
port:
|
||||
number: 8080
|
||||
|
||||
|
||||
|
@ -24,7 +24,7 @@ func New() *DatabaseHandle {
|
||||
} else {
|
||||
db.dbh = conn
|
||||
db.initialized = true
|
||||
//log.Println("Database connection opened")
|
||||
log.Println("Database connection opened")
|
||||
}
|
||||
return &db
|
||||
}
|
||||
@ -32,7 +32,7 @@ func New() *DatabaseHandle {
|
||||
func (self *DatabaseHandle) GetPersonById(id string) (string, error) {
|
||||
if ! self.initialized {
|
||||
err := fmt.Errorf("Database connection not initialized")
|
||||
return "", err
|
||||
return "unknown", err
|
||||
}
|
||||
|
||||
var person Person
|
||||
@ -42,7 +42,7 @@ func (self *DatabaseHandle) GetPersonById(id string) (string, error) {
|
||||
|
||||
if result.Error != nil {
|
||||
err := fmt.Errorf("Query failed: %s", result.Error)
|
||||
return "", err
|
||||
return "unknown", err
|
||||
}
|
||||
|
||||
return person.Name, nil
|
||||
|
@ -3,7 +3,9 @@ module locsrv
|
||||
go 1.21.3
|
||||
|
||||
require (
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/google/uuid v1.5.0
|
||||
gorm.io/driver/postgres v1.5.4
|
||||
gorm.io/gorm v1.25.5
|
||||
)
|
||||
@ -17,6 +19,7 @@ require (
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.14.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
||||
github.com/jackc/pgx/v5 v5.4.3 // indirect
|
||||
@ -36,6 +39,7 @@ require (
|
||||
golang.org/x/arch v0.3.0 // indirect
|
||||
golang.org/x/crypto v0.14.0 // indirect
|
||||
golang.org/x/net v0.10.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
|
@ -8,6 +8,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
@ -28,6 +30,10 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
|
||||
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
|
||||
@ -84,6 +90,8 @@ golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
|
||||
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
|
@ -2,9 +2,12 @@ package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
"github.com/gin-gonic/gin"
|
||||
"net/http"
|
||||
"encoding/json"
|
||||
"locsrv/database"
|
||||
"locsrv/mqtt"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -23,26 +26,42 @@ import (
|
||||
type locativeEvent struct {
|
||||
Trigger string `json:"trigger"`
|
||||
Device string `json:"device"`
|
||||
Id string `json:"id"`
|
||||
Id string `json:"id,omitempty"`
|
||||
Location string `json:"location"`
|
||||
Latitude string `json:"latitude"`
|
||||
Longitude string `json:"longitude"`
|
||||
Person string `json:"person"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
|
||||
func main() {
|
||||
router := gin.Default()
|
||||
|
||||
ch := make(chan locativeEvent)
|
||||
dbh := database.New()
|
||||
mqtt := mqtt.New()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event := <- ch:
|
||||
log.Printf("Trigger: %s, Device: %s, Id: %s", event.Trigger, event.Device, event.Id)
|
||||
person, err := dbh.GetPersonById(event.Device)
|
||||
event.Person = person
|
||||
event.Location = event.Id
|
||||
event.Timestamp = time.Now().Format("2006-01-02 15:04:05 MST")
|
||||
event.Id = ""
|
||||
if err != nil {
|
||||
log.Printf("Person unknown: %v", err)
|
||||
}
|
||||
message, err2 := json.Marshal(event)
|
||||
if err2 != nil {
|
||||
log.Printf("Unable to marshal event: %v", err2)
|
||||
} else {
|
||||
log.Printf("Person: %s", person)
|
||||
log.Printf("Message: %s", message)
|
||||
err := mqtt.Publish(person, string(message))
|
||||
if err != nil {
|
||||
log.Printf("Failed to publish: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
58
src/locsrv/mqtt/mqtt.go
Normal file
58
src/locsrv/mqtt/mqtt.go
Normal file
@ -0,0 +1,58 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"fmt"
|
||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
|
||||
|
||||
type MqttHandle struct {
|
||||
initialized bool
|
||||
pubTopic string
|
||||
client MQTT.Client
|
||||
}
|
||||
|
||||
func New() *MqttHandle {
|
||||
var mqttHandle MqttHandle
|
||||
mqttHandle.initialized = true
|
||||
|
||||
mqttOpts := MQTT.NewClientOptions().
|
||||
AddBroker(os.Getenv("MQTT_BROKER")).
|
||||
SetClientID(fmt.Sprintf("locsrv-%s", uuid.New())).
|
||||
SetConnectRetry(true)
|
||||
mqttHandle.client= MQTT.NewClient(mqttOpts)
|
||||
if token := mqttHandle.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
log.Printf("Unable to connect to broker, error %v", token.Error())
|
||||
mqttHandle.initialized = false
|
||||
}
|
||||
|
||||
mqttHandle.pubTopic = os.Getenv("MQTT_TOPIC")
|
||||
if mqttHandle.pubTopic == "" {
|
||||
log.Printf("No topic set")
|
||||
mqttHandle.initialized = false
|
||||
}
|
||||
log.Printf("MQTT connection established")
|
||||
return &mqttHandle
|
||||
}
|
||||
|
||||
func (self *MqttHandle) Publish(topicPost string, message string) error {
|
||||
if ! self.initialized {
|
||||
return fmt.Errorf("MQTT connection not initialized")
|
||||
}
|
||||
|
||||
topic := fmt.Sprintf("%s/%s", self.pubTopic, topicPost)
|
||||
token := self.client.Publish(topic, 0, true, message)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
return fmt.Errorf("MQTT publish failed: %v", token.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user