graphicek/server/services/mqtt_brokers_service.go

288 lines
5.4 KiB
Go
Raw Normal View History

2024-04-01 19:42:20 +02:00
package services
import (
"basic-sensor-receiver/integrations"
"basic-sensor-receiver/models"
"encoding/json"
"fmt"
"sync"
)
2024-04-01 19:42:20 +02:00
type MQTTBrokersService struct {
ctx *Context
MqttWaitGroup *sync.WaitGroup
MqttClients []*integrations.MQTTListeningClient
2024-04-01 19:42:20 +02:00
}
func (s *MQTTBrokersService) GetList() ([]models.MQTTBrokerItem, error) {
brokers := []models.MQTTBrokerItem{}
err := s.ctx.DB.Select(&brokers, `
SELECT id, name, address, username, password, client_id
FROM mqtt_brokers
ORDER BY name ASC
`)
if err != nil {
return nil, err
}
return brokers, nil
}
func (s *MQTTBrokersService) GetById(id int64) (*models.MQTTBrokerItem, error) {
broker := models.MQTTBrokerItem{}
err := s.ctx.DB.Get(&broker,
`
SELECT id, name, address, username, password, client_id
FROM mqtt_brokers
WHERE id = $1
`,
id,
)
if err != nil {
return nil, err
}
return &broker, nil
}
func (s *MQTTBrokersService) Create(name string, address string, username *string, password *string, clientId *string) (*models.MQTTBrokerItem, error) {
broker := models.MQTTBrokerItem{
Name: name,
Address: address,
Username: username,
Password: password,
ClientId: clientId,
}
res, err := s.ctx.DB.NamedExec(
`
INSERT INTO mqtt_brokers (name, address, username, password, client_id)
VALUES (:name, :address, :username, :password, :client_id)
`,
broker,
)
if err != nil {
return nil, err
}
id, err := res.LastInsertId()
if err != nil {
return nil, err
}
broker.Id = int(id)
s.EnsureListeners()
2024-04-01 19:42:20 +02:00
return &broker, nil
}
func (s *MQTTBrokersService) Update(id int64, name string, address string, username *string, password *string, clientId *string) (*models.MQTTBrokerItem, error) {
broker := models.MQTTBrokerItem{
Id: int(id),
Name: name,
Address: address,
Username: username,
Password: password,
ClientId: clientId,
}
_, err := s.ctx.DB.NamedExec(
`
UPDATE mqtt_brokers
SET name = :name, address = :address, username = :username, password = :password, client_id = :client_id
WHERE id = :id
`,
broker,
)
if err != nil {
return nil, err
}
s.EnsureListeners()
2024-04-01 19:42:20 +02:00
return &broker, nil
}
func (s *MQTTBrokersService) Delete(id int64) error {
_, err := s.ctx.DB.Exec(
`
DELETE FROM mqtt_brokers
WHERE id = $1
`,
id,
)
if err != nil {
return err
}
s.EnsureListeners()
2024-04-01 19:42:20 +02:00
return nil
}
func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message string, qos byte, retain bool) error {
broker, err := s.GetById(brokerId)
if err != nil {
return err
}
username := ""
if broker.Username != nil {
username = *broker.Username
}
password := ""
if broker.Password != nil {
password = *broker.Password
}
clientId := ""
if broker.ClientId != nil {
clientId = *broker.ClientId
}
return s.ctx.Integrations.MQTT.Publish(
broker.Address,
username,
password,
clientId,
retain,
qos,
topic,
message,
)
}
func (s *MQTTBrokersService) EnsureListeners() {
if s.MqttWaitGroup != nil {
// Disconnect all existing clients
for _, client := range s.MqttClients {
if client.Client == nil {
continue
}
client.Client.Disconnect(250)
}
// Wait for all channels to clear
s.MqttWaitGroup.Wait()
// Clear data
s.MqttWaitGroup = nil
s.MqttClients = []*integrations.MQTTListeningClient{}
}
brokers, err := s.GetList()
if err != nil {
fmt.Println(err)
return
}
sensors, err := s.ctx.Services.Sensors.GetList()
if err != nil {
fmt.Println(err)
return
}
// TODO: This needs to be refreshed when a new broker is added or a broker is updated
// TODO: Save all the coroutines and stop them when the server is stopped
s.MqttWaitGroup = &sync.WaitGroup{}
for _, broker := range brokers {
if broker.Username == nil {
broker.Username = new(string)
}
if broker.Password == nil {
broker.Password = new(string)
}
if broker.ClientId == nil {
broker.ClientId = new(string)
}
topics := map[string]byte{}
brokerSensors := []*models.SensorItem{}
for _, sensor := range sensors {
if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id {
continue
}
if sensor.MqttTopic == nil {
continue
}
topics[*sensor.MqttTopic] = byte(0)
brokerSensors = append(brokerSensors, &sensor)
}
if len(brokerSensors) == 0 {
continue
}
fmt.Print("LISTENING TO TOPICS: ")
for topic := range topics {
fmt.Printf("%s ", topic)
}
fmt.Println()
client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics)
if err != nil {
fmt.Println(err)
continue
}
s.MqttClients = append(s.MqttClients, client)
s.MqttWaitGroup.Add(1)
go func() {
for {
data, ok := <-client.Channel
if !ok {
break
}
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", data[0], data[1])
// TODO: This is sub-optional
for _, sensor := range brokerSensors {
2025-03-04 21:31:42 +01:00
if sensor.MqttTopic == nil || sensor.MqttPath == nil {
fmt.Printf("Skipping %s because it's missing topic or path", sensor.Name)
continue
}
if *sensor.MqttTopic != data[0] {
continue
}
var responseJson map[string]any
err := json.Unmarshal([]byte(data[1]), &responseJson)
if err != nil {
fmt.Print(err)
continue
}
s.ctx.Services.SensorValues.Push(sensor.Id, responseJson[*sensor.MqttPath].(float64))
}
}
s.MqttWaitGroup.Done()
}()
}
}