package services import ( "basic-sensor-receiver/integrations" "basic-sensor-receiver/models" "encoding/json" "fmt" "log" "sync" ) type MQTTBrokersService struct { ctx *Context MqttWaitGroup *sync.WaitGroup MqttClients []*integrations.MQTTListeningClient } func (s *MQTTBrokersService) GetList() ([]models.MQTTBrokerItem, error) { brokers := []models.MQTTBrokerItem{} err := s.ctx.DB.Select(&brokers, ` SELECT id, name, address, username, password, client_id FROM mqtt_brokers ORDER BY name ASC `) if err != nil { return nil, err } return brokers, nil } func (s *MQTTBrokersService) GetById(id int64) (*models.MQTTBrokerItem, error) { broker := models.MQTTBrokerItem{} err := s.ctx.DB.Get(&broker, ` SELECT id, name, address, username, password, client_id FROM mqtt_brokers WHERE id = $1 `, id, ) if err != nil { return nil, err } return &broker, nil } func (s *MQTTBrokersService) Create(name string, address string, username *string, password *string, clientId *string) (*models.MQTTBrokerItem, error) { broker := models.MQTTBrokerItem{ Name: name, Address: address, Username: username, Password: password, ClientId: clientId, } res, err := s.ctx.DB.NamedExec( ` INSERT INTO mqtt_brokers (name, address, username, password, client_id) VALUES (:name, :address, :username, :password, :client_id) `, broker, ) if err != nil { return nil, err } id, err := res.LastInsertId() if err != nil { return nil, err } broker.Id = int(id) s.EnsureListeners() return &broker, nil } func (s *MQTTBrokersService) Update(id int64, name string, address string, username *string, password *string, clientId *string) (*models.MQTTBrokerItem, error) { broker := models.MQTTBrokerItem{ Id: int(id), Name: name, Address: address, Username: username, Password: password, ClientId: clientId, } _, err := s.ctx.DB.NamedExec( ` UPDATE mqtt_brokers SET name = :name, address = :address, username = :username, password = :password, client_id = :client_id WHERE id = :id `, broker, ) if err != nil { return nil, err } s.EnsureListeners() return &broker, nil } func (s *MQTTBrokersService) Delete(id int64) error { _, err := s.ctx.DB.Exec( ` DELETE FROM mqtt_brokers WHERE id = $1 `, id, ) if err != nil { return err } s.EnsureListeners() return nil } func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message string, qos byte, retain bool) error { broker, err := s.GetById(brokerId) if err != nil { return err } username := "" if broker.Username != nil { username = *broker.Username } password := "" if broker.Password != nil { password = *broker.Password } clientId := "" if broker.ClientId != nil { clientId = *broker.ClientId } return s.ctx.Integrations.MQTT.Publish( broker.Address, username, password, clientId, retain, qos, topic, message, ) } func (s *MQTTBrokersService) StopListeners() { if s.MqttWaitGroup != nil { for _, client := range s.MqttClients { client.Close() } s.MqttWaitGroup.Wait() } } func (s *MQTTBrokersService) EnsureListeners() { if s.MqttWaitGroup != nil { // Disconnect all existing clients for _, client := range s.MqttClients { client.Close() } // Wait for all channels to clear s.MqttWaitGroup.Wait() // Clear data s.MqttWaitGroup = nil s.MqttClients = []*integrations.MQTTListeningClient{} } brokers, err := s.GetList() if err != nil { log.Println(fmt.Errorf("error getting MQTT brokers: %s", err)) return } sensors, err := s.ctx.Services.Sensors.GetList() if err != nil { log.Println(fmt.Errorf("error getting sensors: %s", err)) return } // TODO: This needs to be refreshed when a new broker is added or a broker is updated // TODO: Save all the coroutines and stop them when the server is stopped s.MqttWaitGroup = &sync.WaitGroup{} for _, broker := range brokers { if broker.Username == nil { broker.Username = new(string) } if broker.Password == nil { broker.Password = new(string) } if broker.ClientId == nil { broker.ClientId = new(string) } topics := map[string]byte{} brokerSensors := []*models.SensorItem{} for _, sensor := range sensors { if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id { continue } if sensor.MqttTopic == nil { continue } topics[*sensor.MqttTopic] = byte(0) brokerSensors = append(brokerSensors, &sensor) } if len(brokerSensors) == 0 { continue } client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics) if err != nil { log.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err)) continue } s.MqttClients = append(s.MqttClients, client) s.MqttWaitGroup.Add(1) log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics)) go func() { for { data, ok := <-client.Channel if !ok { log.Println("WARN: MQTT channel closed unexpectedly") break } if data.Type == "close" { log.Println("MQTT channel closed by request") break } switch data.Type { case "message": { log.Printf("MQTT Message: Topic: %s Message: %s\n", data.Topic, string(data.Payload)) // TODO: This is sub-optional for _, sensor := range brokerSensors { if sensor.MqttTopic == nil || sensor.MqttPath == nil { log.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name) continue } if *sensor.MqttTopic != data.Topic { continue } var responseJson map[string]any err := json.Unmarshal(data.Payload, &responseJson) if err != nil { log.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err)) continue } if _, exists := responseJson[*sensor.MqttPath]; !exists { log.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath) continue } value, ok := responseJson[*sensor.MqttPath].(float64) if !ok { log.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath) continue } s.ctx.Services.SensorValues.Push(sensor.Id, value) } break } } } s.MqttWaitGroup.Done() }() } }