introduce channel and dispatcher
This commit is contained in:
33
main.go
33
main.go
@ -5,13 +5,32 @@ import "fmt"
|
||||
import "os"
|
||||
import "os/signal"
|
||||
import "strings"
|
||||
// import "time"
|
||||
import MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
import "github.com/google/uuid"
|
||||
|
||||
|
||||
type Message struct {
|
||||
topic string
|
||||
payload []byte
|
||||
}
|
||||
|
||||
var messageChannel chan Message
|
||||
|
||||
|
||||
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
|
||||
log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
|
||||
// log.Printf("Message received, topic: %s, payload: %s\n", message.Topic(), message.Payload())
|
||||
m := Message {
|
||||
topic: message.Topic(),
|
||||
payload: message.Payload(),
|
||||
}
|
||||
select {
|
||||
case messageChannel <- m:
|
||||
{}
|
||||
//log.Println("Message sent to channel")
|
||||
default:
|
||||
log.Println("Channel full, message lost")
|
||||
}
|
||||
}
|
||||
|
||||
func onConnectionLost(client MQTT.Client, error error) {
|
||||
@ -37,6 +56,15 @@ func onConnect(client MQTT.Client) {
|
||||
}
|
||||
}
|
||||
|
||||
func dispatcher() {
|
||||
for {
|
||||
select {
|
||||
case message := <- messageChannel:
|
||||
log.Printf("Message arrived in dispatcher, topic: %s, payload: %s\n", message.topic, message.payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func main() {
|
||||
log.SetPrefix("UDI: ")
|
||||
@ -44,6 +72,9 @@ func main() {
|
||||
|
||||
log.Println("UDI starting")
|
||||
|
||||
messageChannel = make(chan Message, 100)
|
||||
go dispatcher()
|
||||
|
||||
broker := os.Getenv("MQTT_BROKER")
|
||||
if broker == "" {
|
||||
log.Fatal("No broker given, set env var MQTT_BROKER")
|
||||
|
Reference in New Issue
Block a user