MQTT Sensors #1

Merged
kamen merged 6 commits from feature/mqtt-sensors into master 2025-03-05 11:10:10 +01:00
1 changed files with 18 additions and 12 deletions
Showing only changes of commit 3029d4af52 - Show all commits

View File

@ -184,14 +184,14 @@ func (s *MQTTBrokersService) EnsureListeners() {
brokers, err := s.GetList() brokers, err := s.GetList()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(fmt.Errorf("error getting MQTT brokers: %s", err))
return return
} }
sensors, err := s.ctx.Services.Sensors.GetList() sensors, err := s.ctx.Services.Sensors.GetList()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(fmt.Errorf("error getting sensors: %s", err))
return return
} }
@ -233,16 +233,10 @@ func (s *MQTTBrokersService) EnsureListeners() {
continue continue
} }
fmt.Print("LISTENING TO TOPICS: ")
for topic := range topics {
fmt.Printf("%s ", topic)
}
fmt.Println()
client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics) client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err))
continue continue
} }
@ -261,7 +255,7 @@ func (s *MQTTBrokersService) EnsureListeners() {
// TODO: This is sub-optional // TODO: This is sub-optional
for _, sensor := range brokerSensors { for _, sensor := range brokerSensors {
if sensor.MqttTopic == nil || sensor.MqttPath == nil { if sensor.MqttTopic == nil || sensor.MqttPath == nil {
fmt.Printf("Skipping %s because it's missing topic or path", sensor.Name) fmt.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name)
continue continue
} }
@ -273,11 +267,23 @@ func (s *MQTTBrokersService) EnsureListeners() {
err := json.Unmarshal([]byte(data[1]), &responseJson) err := json.Unmarshal([]byte(data[1]), &responseJson)
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err))
continue continue
} }
s.ctx.Services.SensorValues.Push(sensor.Id, responseJson[*sensor.MqttPath].(float64)) if _, exists := responseJson[*sensor.MqttPath]; !exists {
fmt.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath)
continue
}
value, ok := responseJson[*sensor.MqttPath].(float64)
if !ok {
fmt.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath)
continue
}
s.ctx.Services.SensorValues.Push(sensor.Id, value)
} }
} }