Refactoring

This commit is contained in:
Jan Zípek 2025-03-05 11:00:51 +01:00
parent 3029d4af52
commit bae535a014
Signed by: kamen
GPG Key ID: A17882625B33AC31
3 changed files with 89 additions and 38 deletions

View File

@ -6,9 +6,15 @@ import (
type MQTTIntegration struct{} type MQTTIntegration struct{}
type MQTTEvent struct {
Type string
Topic string
Payload []byte
}
type MQTTListeningClient struct { type MQTTListeningClient struct {
Client MQTT.Client Client MQTT.Client
Channel chan [2]string Channel chan MQTTEvent
} }
func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error { func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error {
@ -32,7 +38,7 @@ func (s *MQTTIntegration) Publish(server string, username string, password strin
} }
func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) { func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) {
channel := make(chan [2]string) channel := make(chan MQTTEvent)
opts := MQTT.NewClientOptions() opts := MQTT.NewClientOptions()
opts.AddBroker(server) opts.AddBroker(server)
@ -40,7 +46,11 @@ func (s *MQTTIntegration) Listen(server string, username string, password string
opts.SetPassword(password) opts.SetPassword(password)
opts.SetClientID(clientId) opts.SetClientID(clientId)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
channel <- [2]string{msg.Topic(), string(msg.Payload())} channel <- MQTTEvent{
Type: "message",
Topic: msg.Topic(),
Payload: msg.Payload(),
}
}) })
client := MQTT.NewClient(opts) client := MQTT.NewClient(opts)
@ -57,3 +67,12 @@ func (s *MQTTIntegration) Listen(server string, username string, password string
Channel: channel, Channel: channel,
}, nil }, nil
} }
func (c *MQTTListeningClient) Close() {
c.Channel <- MQTTEvent{
Type: "close",
}
c.Client.Disconnect(250)
close(c.Channel)
}

View File

@ -119,6 +119,8 @@ func main() {
// Shutdown the server // Shutdown the server
log.Println("Shutting down server...") log.Println("Shutting down server...")
server.Services.MQTTBrokers.StopListeners()
if err := srv.Shutdown(context.TODO()); err != nil { if err := srv.Shutdown(context.TODO()); err != nil {
log.Fatalf("Server shutdown failed: %v", err) log.Fatalf("Server shutdown failed: %v", err)
} }

View File

@ -5,6 +5,7 @@ import (
"basic-sensor-receiver/models" "basic-sensor-receiver/models"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"sync" "sync"
) )
@ -162,6 +163,20 @@ func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message
) )
} }
func (s *MQTTBrokersService) StopListeners() {
if s.MqttWaitGroup != nil {
for _, client := range s.MqttClients {
if client.Client == nil {
continue
}
client.Client.Disconnect(250)
}
s.MqttWaitGroup.Wait()
}
}
func (s *MQTTBrokersService) EnsureListeners() { func (s *MQTTBrokersService) EnsureListeners() {
if s.MqttWaitGroup != nil { if s.MqttWaitGroup != nil {
// Disconnect all existing clients // Disconnect all existing clients
@ -184,14 +199,14 @@ func (s *MQTTBrokersService) EnsureListeners() {
brokers, err := s.GetList() brokers, err := s.GetList()
if err != nil { if err != nil {
fmt.Println(fmt.Errorf("error getting MQTT brokers: %s", err)) log.Println(fmt.Errorf("error getting MQTT brokers: %s", err))
return return
} }
sensors, err := s.ctx.Services.Sensors.GetList() sensors, err := s.ctx.Services.Sensors.GetList()
if err != nil { if err != nil {
fmt.Println(fmt.Errorf("error getting sensors: %s", err)) log.Println(fmt.Errorf("error getting sensors: %s", err))
return return
} }
@ -236,54 +251,69 @@ func (s *MQTTBrokersService) EnsureListeners() {
client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics) client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics)
if err != nil { if err != nil {
fmt.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err)) log.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err))
continue continue
} }
s.MqttClients = append(s.MqttClients, client) s.MqttClients = append(s.MqttClients, client)
s.MqttWaitGroup.Add(1) s.MqttWaitGroup.Add(1)
log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics))
go func() { go func() {
for { for {
data, ok := <-client.Channel data, ok := <-client.Channel
if !ok { if !ok {
log.Println("WARN: MQTT channel closed")
break break
} }
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", data[0], data[1]) if data.Type == "close" {
log.Println("MQTT channel closed by request")
break
}
// TODO: This is sub-optional switch data.Type {
for _, sensor := range brokerSensors { case "message":
if sensor.MqttTopic == nil || sensor.MqttPath == nil { {
fmt.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name) log.Printf("MQTT Message: Topic: %s Message: %s\n", data.Topic, string(data.Payload))
continue
// TODO: This is sub-optional
for _, sensor := range brokerSensors {
if sensor.MqttTopic == nil || sensor.MqttPath == nil {
log.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name)
continue
}
if *sensor.MqttTopic != data.Topic {
continue
}
var responseJson map[string]any
err := json.Unmarshal(data.Payload, &responseJson)
if err != nil {
log.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err))
continue
}
if _, exists := responseJson[*sensor.MqttPath]; !exists {
log.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath)
continue
}
value, ok := responseJson[*sensor.MqttPath].(float64)
if !ok {
log.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath)
continue
}
s.ctx.Services.SensorValues.Push(sensor.Id, value)
}
break
} }
if *sensor.MqttTopic != data[0] {
continue
}
var responseJson map[string]any
err := json.Unmarshal([]byte(data[1]), &responseJson)
if err != nil {
fmt.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err))
continue
}
if _, exists := responseJson[*sensor.MqttPath]; !exists {
fmt.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath)
continue
}
value, ok := responseJson[*sensor.MqttPath].(float64)
if !ok {
fmt.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath)
continue
}
s.ctx.Services.SensorValues.Push(sensor.Id, value)
} }
} }