diff --git a/server/integrations/mqtt.go b/server/integrations/mqtt.go index d231892..166d35d 100644 --- a/server/integrations/mqtt.go +++ b/server/integrations/mqtt.go @@ -6,9 +6,15 @@ import ( type MQTTIntegration struct{} +type MQTTEvent struct { + Type string + Topic string + Payload []byte +} + type MQTTListeningClient struct { Client MQTT.Client - Channel chan [2]string + Channel chan MQTTEvent } func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error { @@ -32,7 +38,7 @@ func (s *MQTTIntegration) Publish(server string, username string, password strin } func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) { - channel := make(chan [2]string) + channel := make(chan MQTTEvent) opts := MQTT.NewClientOptions() opts.AddBroker(server) @@ -40,7 +46,11 @@ func (s *MQTTIntegration) Listen(server string, username string, password string opts.SetPassword(password) opts.SetClientID(clientId) opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { - channel <- [2]string{msg.Topic(), string(msg.Payload())} + channel <- MQTTEvent{ + Type: "message", + Topic: msg.Topic(), + Payload: msg.Payload(), + } }) client := MQTT.NewClient(opts) @@ -57,3 +67,12 @@ func (s *MQTTIntegration) Listen(server string, username string, password string Channel: channel, }, nil } + +func (c *MQTTListeningClient) Close() { + c.Channel <- MQTTEvent{ + Type: "close", + } + + c.Client.Disconnect(250) + close(c.Channel) +} diff --git a/server/main.go b/server/main.go index 3118adb..43c0de5 100644 --- a/server/main.go +++ b/server/main.go @@ -119,6 +119,8 @@ func main() { // Shutdown the server log.Println("Shutting down server...") + server.Services.MQTTBrokers.StopListeners() + if err := srv.Shutdown(context.TODO()); err != nil { log.Fatalf("Server shutdown failed: %v", err) } diff --git a/server/services/mqtt_brokers_service.go b/server/services/mqtt_brokers_service.go index 177c894..d674bf8 100644 --- a/server/services/mqtt_brokers_service.go +++ b/server/services/mqtt_brokers_service.go @@ -5,6 +5,7 @@ import ( "basic-sensor-receiver/models" "encoding/json" "fmt" + "log" "sync" ) @@ -162,6 +163,20 @@ func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message ) } +func (s *MQTTBrokersService) StopListeners() { + if s.MqttWaitGroup != nil { + for _, client := range s.MqttClients { + if client.Client == nil { + continue + } + + client.Client.Disconnect(250) + } + + s.MqttWaitGroup.Wait() + } +} + func (s *MQTTBrokersService) EnsureListeners() { if s.MqttWaitGroup != nil { // Disconnect all existing clients @@ -184,14 +199,14 @@ func (s *MQTTBrokersService) EnsureListeners() { brokers, err := s.GetList() if err != nil { - fmt.Println(fmt.Errorf("error getting MQTT brokers: %s", err)) + log.Println(fmt.Errorf("error getting MQTT brokers: %s", err)) return } sensors, err := s.ctx.Services.Sensors.GetList() if err != nil { - fmt.Println(fmt.Errorf("error getting sensors: %s", err)) + log.Println(fmt.Errorf("error getting sensors: %s", err)) return } @@ -236,54 +251,69 @@ func (s *MQTTBrokersService) EnsureListeners() { client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics) if err != nil { - fmt.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err)) + log.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err)) continue } s.MqttClients = append(s.MqttClients, client) s.MqttWaitGroup.Add(1) + log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics)) + go func() { for { data, ok := <-client.Channel if !ok { + log.Println("WARN: MQTT channel closed") break } - fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", data[0], data[1]) + if data.Type == "close" { + log.Println("MQTT channel closed by request") + break + } - // TODO: This is sub-optional - for _, sensor := range brokerSensors { - if sensor.MqttTopic == nil || sensor.MqttPath == nil { - fmt.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name) - continue + switch data.Type { + case "message": + { + log.Printf("MQTT Message: Topic: %s Message: %s\n", data.Topic, string(data.Payload)) + + // TODO: This is sub-optional + for _, sensor := range brokerSensors { + if sensor.MqttTopic == nil || sensor.MqttPath == nil { + log.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name) + continue + } + + if *sensor.MqttTopic != data.Topic { + continue + } + + var responseJson map[string]any + err := json.Unmarshal(data.Payload, &responseJson) + + if err != nil { + log.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err)) + continue + } + + if _, exists := responseJson[*sensor.MqttPath]; !exists { + log.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath) + continue + } + + value, ok := responseJson[*sensor.MqttPath].(float64) + + if !ok { + log.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath) + continue + } + + s.ctx.Services.SensorValues.Push(sensor.Id, value) + } + + break } - - if *sensor.MqttTopic != data[0] { - continue - } - - var responseJson map[string]any - err := json.Unmarshal([]byte(data[1]), &responseJson) - - if err != nil { - fmt.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err)) - continue - } - - if _, exists := responseJson[*sensor.MqttPath]; !exists { - fmt.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath) - continue - } - - value, ok := responseJson[*sensor.MqttPath].(float64) - - if !ok { - fmt.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath) - continue - } - - s.ctx.Services.SensorValues.Push(sensor.Id, value) } }