MQTT Sensors #1

Merged
kamen merged 6 commits from feature/mqtt-sensors into master 2025-03-05 11:10:10 +01:00
11 changed files with 305 additions and 28 deletions
Showing only changes of commit 4f9fcbe7e0 - Show all commits

View File

@ -5,18 +5,33 @@ export type SensorInfo = {
name: string
authKey: string
lastContactAt?: number
type: 'rest' | 'mqtt'
mqttTopic?: string
mqttBrokerId?: number
mqttPath?: string
}
export type SensorModifiableData = {
name: string
type: 'rest' | 'mqtt'
mqttTopic?: string
mqttBrokerId?: number
mqttPath?: string
}
export const getSensors = () => request<SensorInfo[]>('/api/sensors')
export const createSensor = (name: string) =>
export const createSensor = (data: SensorModifiableData) =>
request<SensorInfo>('/api/sensors', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ name }),
body: JSON.stringify(data),
})
export const updateSensor = ({ id, ...body }: { id: number; name: string }) =>
export const updateSensor = ({
id,
...body
}: { id: number } & SensorModifiableData) =>
request<SensorInfo>(`/api/sensors/${id}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },

View File

@ -1,7 +1,8 @@
import { getMQTTBrokers } from '@/api/mqttBrokers'
import { createSensor, SensorInfo, updateSensor } from '@/api/sensors'
import { Modal } from '@/components/Modal'
import { useForm } from '@/utils/hooks/useForm'
import { useMutation, useQueryClient } from 'react-query'
import { useMutation, useQuery, useQueryClient } from 'react-query'
type Props = {
open: boolean
@ -14,22 +15,36 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
const createMutation = useMutation(createSensor)
const updateMutation = useMutation(updateSensor)
const { handleSubmit, register } = useForm({
const brokers = useQuery(['/mqtt/brokers'], getMQTTBrokers)
const { handleSubmit, register, watch } = useForm({
defaultValue: () => ({
name: sensor?.name ?? '',
type: sensor?.type ?? 'rest',
mqttBrokerId: sensor?.mqttBrokerId ?? null,
mqttPath: sensor?.mqttPath ?? null,
mqttTopic: sensor?.mqttTopic ?? null,
}),
onSubmit: async (v) => {
if (isLoading) {
return
}
const data = {
name: v.name,
type: v.type,
mqttBrokerId: v.mqttBrokerId ? +v.mqttBrokerId : undefined,
mqttPath: v.mqttPath ?? undefined,
mqttTopic: v.mqttTopic ?? undefined,
}
if (sensor) {
await updateMutation.mutateAsync({
id: sensor.id,
name: v.name,
...data,
})
} else {
await createMutation.mutateAsync(v.name)
await createMutation.mutateAsync(data)
}
queryClient.invalidateQueries(['/sensors'])
@ -38,6 +53,8 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
},
})
const type = watch('type')
const isLoading = createMutation.isLoading || updateMutation.isLoading
return (
@ -54,6 +71,39 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
/>
</div>
<div className="input">
<label>Type</label>
<select {...register('type')}>
<option value="rest">REST</option>
<option value="mqtt">MQTT</option>
</select>
</div>
{type === 'mqtt' && (
<>
<div className="input">
<label>Broker</label>
<select {...register('mqttBrokerId')}>
{brokers.data?.map((b) => (
<option key={b.id} value={b.id}>
{b.name}
</option>
))}
</select>
</div>
<div className="input">
<label>Topic</label>
<input type="text" {...register('mqttTopic')} />
</div>
<div className="input">
<label>Path</label>
<input type="text" {...register('mqttPath')} />
</div>
</>
)}
<div className="actions">
<button className="cancel" type="button" onClick={onClose}>
Cancel

5
server/app/mqtt.go Normal file
View File

@ -0,0 +1,5 @@
package app
func (s *Server) EnsureMqttListeners() {
s.Services.MQTTBrokers.EnsureListeners()
}

View File

@ -12,6 +12,7 @@ type Server struct {
DB *sqlx.DB
Config *config.Config
Services *services.Services
Integrations *services.Integrations
}
func InitializeServer() *Server {
@ -29,6 +30,7 @@ func InitializeServer() *Server {
ctx := services.Context{DB: server.DB, Config: server.Config}
server.Services = services.InitializeServices(&ctx)
server.Integrations = ctx.Integrations
return &server
}

View File

@ -0,0 +1,4 @@
ALTER TABLE sensors ADD COLUMN type TEXT;
ALTER TABLE sensors ADD COLUMN mqtt_broker_id INTEGER;
ALTER TABLE sensors ADD COLUMN mqtt_topic TEXT;
ALTER TABLE sensors ADD COLUMN mqtt_path TEXT;

View File

@ -6,6 +6,11 @@ import (
type MQTTIntegration struct{}
type MQTTListeningClient struct {
Client MQTT.Client
Channel chan [2]string
}
func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error {
opts := MQTT.NewClientOptions()
opts.AddBroker(server)
@ -25,3 +30,30 @@ func (s *MQTTIntegration) Publish(server string, username string, password strin
return nil
}
func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) {
channel := make(chan [2]string)
opts := MQTT.NewClientOptions()
opts.AddBroker(server)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID(clientId)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
channel <- [2]string{msg.Topic(), string(msg.Payload())}
})
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
if token := client.SubscribeMultiple(topics, nil); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MQTTListeningClient{
Client: client,
Channel: channel,
}, nil
}

View File

@ -90,6 +90,9 @@ func main() {
// Starts alerts handling goroutine
server.StartAlerts()
// MQTT listeners
server.EnsureMqttListeners()
// Graceful shutdown using SIGTERM or SIGINT
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

View File

@ -3,6 +3,10 @@ package models
type SensorItem struct {
Id int64 `json:"id" db:"id"`
Name string `json:"name" db:"name"`
Type *string `json:"type" db:"type"`
MqttBrokerId *int `json:"mqttBrokerId" db:"mqtt_broker_id"`
MqttTopic *string `json:"mqttTopic" db:"mqtt_topic"`
MqttPath *string `json:"mqttPath" db:"mqtt_path"`
AuthKey string `json:"authKey" db:"auth_key"`
LastContactAt *int64 `json:"lastContactAt" db:"last_contact_at"`
}

View File

@ -10,6 +10,10 @@ import (
type postOrPutSensorsBody struct {
Name string `json:"name" binding:"required"`
Type *string `json:"type"`
MqttBrokerId *int `json:"mqttBrokerId"`
MqttTopic *string `json:"mqttTopic"`
MqttPath *string `json:"mqttPath"`
}
func GetSensors(s *app.Server) gin.HandlerFunc {
@ -33,7 +37,7 @@ func PostSensors(s *app.Server) gin.HandlerFunc {
return
}
sensor, err := s.Services.Sensors.Create(body.Name)
sensor, err := s.Services.Sensors.Create(body.Name, body.Type, body.MqttBrokerId, body.MqttTopic, body.MqttPath)
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
@ -56,7 +60,7 @@ func PutSensor(s *app.Server) gin.HandlerFunc {
return
}
sensor, err := s.Services.Sensors.Update(sensorId, body.Name)
sensor, err := s.Services.Sensors.Update(sensorId, body.Name, body.Type, body.MqttBrokerId, body.MqttTopic, body.MqttPath)
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)

View File

@ -1,9 +1,18 @@
package services
import "basic-sensor-receiver/models"
import (
"basic-sensor-receiver/integrations"
"basic-sensor-receiver/models"
"encoding/json"
"fmt"
"sync"
)
type MQTTBrokersService struct {
ctx *Context
MqttWaitGroup *sync.WaitGroup
MqttClients []*integrations.MQTTListeningClient
}
func (s *MQTTBrokersService) GetList() ([]models.MQTTBrokerItem, error) {
@ -69,6 +78,8 @@ func (s *MQTTBrokersService) Create(name string, address string, username *strin
broker.Id = int(id)
s.EnsureListeners()
return &broker, nil
}
@ -95,6 +106,8 @@ func (s *MQTTBrokersService) Update(id int64, name string, address string, usern
return nil, err
}
s.EnsureListeners()
return &broker, nil
}
@ -111,6 +124,8 @@ func (s *MQTTBrokersService) Delete(id int64) error {
return err
}
s.EnsureListeners()
return nil
}
@ -146,3 +161,122 @@ func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message
message,
)
}
func (s *MQTTBrokersService) EnsureListeners() {
if s.MqttWaitGroup != nil {
// Disconnect all existing clients
for _, client := range s.MqttClients {
if client.Client == nil {
continue
}
client.Client.Disconnect(250)
}
// Wait for all channels to clear
s.MqttWaitGroup.Wait()
// Clear data
s.MqttWaitGroup = nil
s.MqttClients = []*integrations.MQTTListeningClient{}
}
brokers, err := s.GetList()
if err != nil {
fmt.Println(err)
return
}
sensors, err := s.ctx.Services.Sensors.GetList()
if err != nil {
fmt.Println(err)
return
}
// TODO: This needs to be refreshed when a new broker is added or a broker is updated
// TODO: Save all the coroutines and stop them when the server is stopped
s.MqttWaitGroup = &sync.WaitGroup{}
for _, broker := range brokers {
if broker.Username == nil {
broker.Username = new(string)
}
if broker.Password == nil {
broker.Password = new(string)
}
if broker.ClientId == nil {
broker.ClientId = new(string)
}
topics := map[string]byte{}
brokerSensors := []*models.SensorItem{}
for _, sensor := range sensors {
if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id {
continue
}
if sensor.MqttTopic == nil {
continue
}
topics[*sensor.MqttTopic] = byte(0)
brokerSensors = append(brokerSensors, &sensor)
}
if len(brokerSensors) == 0 {
continue
}
fmt.Print("LISTENING TO TOPICS: ")
for topic := range topics {
fmt.Printf("%s ", topic)
}
fmt.Println()
client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics)
if err != nil {
fmt.Println(err)
continue
}
s.MqttClients = append(s.MqttClients, client)
s.MqttWaitGroup.Add(1)
go func() {
for {
data, ok := <-client.Channel
if !ok {
break
}
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", data[0], data[1])
// TODO: This is sub-optional
for _, sensor := range brokerSensors {
if sensor.MqttTopic == nil || *sensor.MqttTopic != data[0] || sensor.MqttPath == nil {
continue
}
var responseJson map[string]any
err := json.Unmarshal([]byte(data[1]), &responseJson)
if err != nil {
fmt.Print(err)
continue
}
s.ctx.Services.SensorValues.Push(sensor.Id, responseJson[*sensor.MqttPath].(float64))
}
}
s.MqttWaitGroup.Done()
}()
}
}

View File

@ -13,7 +13,7 @@ type SensorsService struct {
func (s *SensorsService) GetList() ([]models.SensorItem, error) {
sensors := []models.SensorItem{}
err := s.ctx.DB.Select(&sensors, "SELECT id, name, auth_key, last_contact_at FROM sensors")
err := s.ctx.DB.Select(&sensors, "SELECT id, name, type, mqtt_broker_id, mqtt_topic, auth_key, last_contact_at FROM sensors")
if err != nil {
return nil, err
@ -22,19 +22,28 @@ func (s *SensorsService) GetList() ([]models.SensorItem, error) {
return sensors, nil
}
func (s *SensorsService) Create(name string) (*models.SensorItem, error) {
func (s *SensorsService) Create(name string, sensorType *string, mqttBrokerId *int, mqttTopic *string, mqttPath *string) (*models.SensorItem, error) {
authKey, err := generateRandomString(32)
if err != nil {
return nil, err
}
if sensorType == nil {
sensorType = new(string)
*sensorType = "rest"
}
item := models.SensorItem{
Name: name,
AuthKey: authKey,
Type: sensorType,
MqttBrokerId: mqttBrokerId,
MqttTopic: mqttTopic,
MqttPath: mqttPath,
}
res, err := s.ctx.DB.Exec("INSERT INTO sensors (name, auth_key) VALUES (?, ?)", item.Name, item.AuthKey)
res, err := s.ctx.DB.Exec("INSERT INTO sensors (name, auth_key, type, mqtt_broker_id, mqtt_topic, mqtt_path) VALUES (?, ?, ?, ?, ?, ?)", item.Name, item.AuthKey, item.Type, item.MqttBrokerId, item.MqttTopic, item.MqttPath)
if err != nil {
return nil, err
@ -46,13 +55,15 @@ func (s *SensorsService) Create(name string) (*models.SensorItem, error) {
return nil, err
}
s.ctx.Services.MQTTBrokers.EnsureListeners()
return &item, nil
}
func (s *SensorsService) GetById(id int64) (*models.SensorItem, error) {
item := models.SensorItem{}
err := s.ctx.DB.Get(&item, "SELECT id, name, auth_key, last_contact_at FROM sensors WHERE id = $1", id)
err := s.ctx.DB.Get(&item, "SELECT id, name, auth_key, last_contact_at, type, mqtt_broker_id, mqtt_topic, mqtt_path FROM sensors WHERE id = $1", id)
if err != nil {
return nil, err
@ -61,21 +72,32 @@ func (s *SensorsService) GetById(id int64) (*models.SensorItem, error) {
return &item, nil
}
func (s *SensorsService) Update(id int64, name string) (*models.SensorItem, error) {
func (s *SensorsService) Update(id int64, name string, sensorType *string, mqttBrokerId *int, mqttTopic *string, mqttPath *string) (*models.SensorItem, error) {
item, err := s.GetById(id)
if err != nil {
return nil, err
}
item.Name = name
if sensorType == nil {
sensorType = new(string)
*sensorType = "rest"
}
_, err = s.ctx.DB.Exec("UPDATE sensors SET name = ? WHERE id = ?", item.Name, item.Id)
item.Name = name
item.Type = sensorType
item.MqttBrokerId = mqttBrokerId
item.MqttTopic = mqttTopic
item.MqttPath = mqttPath
_, err = s.ctx.DB.Exec("UPDATE sensors SET name = ?, type = ?, mqtt_broker_id = ?, mqtt_topic = ?, mqtt_path = ? WHERE id = ?", item.Name, item.Type, item.MqttBrokerId, item.MqttTopic, item.MqttPath, item.Id)
if err != nil {
return nil, err
}
s.ctx.Services.MQTTBrokers.EnsureListeners()
return item, nil
}
@ -94,6 +116,8 @@ func (s *SensorsService) DeleteById(id int64) error {
_, err = s.ctx.DB.Exec("DELETE FROM sensors WHERE id = ?", item.Id)
s.ctx.Services.MQTTBrokers.EnsureListeners()
return err
}