Compare commits
No commits in common. "44d57143d0655d90016b0243db157bdb9715d3ff" and "40fac3a6f3aadc5b533b706a1a725a5f5ebd5e69" have entirely different histories.
44d57143d0
...
40fac3a6f3
|
|
@ -5,33 +5,18 @@ export type SensorInfo = {
|
||||||
name: string
|
name: string
|
||||||
authKey: string
|
authKey: string
|
||||||
lastContactAt?: number
|
lastContactAt?: number
|
||||||
type: 'rest' | 'mqtt'
|
|
||||||
mqttTopic?: string
|
|
||||||
mqttBrokerId?: number
|
|
||||||
mqttPath?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export type SensorModifiableData = {
|
|
||||||
name: string
|
|
||||||
type: 'rest' | 'mqtt'
|
|
||||||
mqttTopic?: string
|
|
||||||
mqttBrokerId?: number
|
|
||||||
mqttPath?: string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const getSensors = () => request<SensorInfo[]>('/api/sensors')
|
export const getSensors = () => request<SensorInfo[]>('/api/sensors')
|
||||||
|
|
||||||
export const createSensor = (data: SensorModifiableData) =>
|
export const createSensor = (name: string) =>
|
||||||
request<SensorInfo>('/api/sensors', {
|
request<SensorInfo>('/api/sensors', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'content-type': 'application/json' },
|
headers: { 'content-type': 'application/json' },
|
||||||
body: JSON.stringify(data),
|
body: JSON.stringify({ name }),
|
||||||
})
|
})
|
||||||
|
|
||||||
export const updateSensor = ({
|
export const updateSensor = ({ id, ...body }: { id: number; name: string }) =>
|
||||||
id,
|
|
||||||
...body
|
|
||||||
}: { id: number } & SensorModifiableData) =>
|
|
||||||
request<SensorInfo>(`/api/sensors/${id}`, {
|
request<SensorInfo>(`/api/sensors/${id}`, {
|
||||||
method: 'PUT',
|
method: 'PUT',
|
||||||
headers: { 'content-type': 'application/json' },
|
headers: { 'content-type': 'application/json' },
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
import { getMQTTBrokers } from '@/api/mqttBrokers'
|
|
||||||
import { createSensor, SensorInfo, updateSensor } from '@/api/sensors'
|
import { createSensor, SensorInfo, updateSensor } from '@/api/sensors'
|
||||||
import { Modal } from '@/components/Modal'
|
import { Modal } from '@/components/Modal'
|
||||||
import { useForm } from '@/utils/hooks/useForm'
|
import { useForm } from '@/utils/hooks/useForm'
|
||||||
import { useMutation, useQuery, useQueryClient } from 'react-query'
|
import { useMutation, useQueryClient } from 'react-query'
|
||||||
|
|
||||||
type Props = {
|
type Props = {
|
||||||
open: boolean
|
open: boolean
|
||||||
|
|
@ -15,36 +14,22 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
|
||||||
const createMutation = useMutation(createSensor)
|
const createMutation = useMutation(createSensor)
|
||||||
const updateMutation = useMutation(updateSensor)
|
const updateMutation = useMutation(updateSensor)
|
||||||
|
|
||||||
const brokers = useQuery(['/mqtt/brokers'], getMQTTBrokers)
|
const { handleSubmit, register } = useForm({
|
||||||
|
|
||||||
const { handleSubmit, register, watch } = useForm({
|
|
||||||
defaultValue: () => ({
|
defaultValue: () => ({
|
||||||
name: sensor?.name ?? '',
|
name: sensor?.name ?? '',
|
||||||
type: sensor?.type ?? 'rest',
|
|
||||||
mqttBrokerId: sensor?.mqttBrokerId ?? null,
|
|
||||||
mqttPath: sensor?.mqttPath ?? null,
|
|
||||||
mqttTopic: sensor?.mqttTopic ?? null,
|
|
||||||
}),
|
}),
|
||||||
onSubmit: async (v) => {
|
onSubmit: async (v) => {
|
||||||
if (isLoading) {
|
if (isLoading) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const data = {
|
|
||||||
name: v.name,
|
|
||||||
type: v.type,
|
|
||||||
mqttBrokerId: v.mqttBrokerId ? +v.mqttBrokerId : undefined,
|
|
||||||
mqttPath: v.mqttPath ?? undefined,
|
|
||||||
mqttTopic: v.mqttTopic ?? undefined,
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sensor) {
|
if (sensor) {
|
||||||
await updateMutation.mutateAsync({
|
await updateMutation.mutateAsync({
|
||||||
id: sensor.id,
|
id: sensor.id,
|
||||||
...data,
|
name: v.name,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
await createMutation.mutateAsync(data)
|
await createMutation.mutateAsync(v.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
queryClient.invalidateQueries(['/sensors'])
|
queryClient.invalidateQueries(['/sensors'])
|
||||||
|
|
@ -53,8 +38,6 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
const type = watch('type')
|
|
||||||
|
|
||||||
const isLoading = createMutation.isLoading || updateMutation.isLoading
|
const isLoading = createMutation.isLoading || updateMutation.isLoading
|
||||||
|
|
||||||
return (
|
return (
|
||||||
|
|
@ -71,39 +54,6 @@ export const SensorFormModal = ({ open, onClose, sensor }: Props) => {
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div className="input">
|
|
||||||
<label>Type</label>
|
|
||||||
<select {...register('type')}>
|
|
||||||
<option value="rest">REST</option>
|
|
||||||
<option value="mqtt">MQTT</option>
|
|
||||||
</select>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
{type === 'mqtt' && (
|
|
||||||
<>
|
|
||||||
<div className="input">
|
|
||||||
<label>Broker</label>
|
|
||||||
<select {...register('mqttBrokerId')}>
|
|
||||||
{brokers.data?.map((b) => (
|
|
||||||
<option key={b.id} value={b.id}>
|
|
||||||
{b.name}
|
|
||||||
</option>
|
|
||||||
))}
|
|
||||||
</select>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<div className="input">
|
|
||||||
<label>Topic</label>
|
|
||||||
<input type="text" {...register('mqttTopic')} />
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<div className="input">
|
|
||||||
<label>Path</label>
|
|
||||||
<input type="text" {...register('mqttPath')} />
|
|
||||||
</div>
|
|
||||||
</>
|
|
||||||
)}
|
|
||||||
|
|
||||||
<div className="actions">
|
<div className="actions">
|
||||||
<button className="cancel" type="button" onClick={onClose}>
|
<button className="cancel" type="button" onClick={onClose}>
|
||||||
Cancel
|
Cancel
|
||||||
|
|
|
||||||
|
|
@ -1,5 +0,0 @@
|
||||||
package app
|
|
||||||
|
|
||||||
func (s *Server) EnsureMqttListeners() {
|
|
||||||
s.Services.MQTTBrokers.EnsureListeners()
|
|
||||||
}
|
|
||||||
|
|
@ -12,7 +12,6 @@ type Server struct {
|
||||||
DB *sqlx.DB
|
DB *sqlx.DB
|
||||||
Config *config.Config
|
Config *config.Config
|
||||||
Services *services.Services
|
Services *services.Services
|
||||||
Integrations *services.Integrations
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitializeServer() *Server {
|
func InitializeServer() *Server {
|
||||||
|
|
@ -30,7 +29,6 @@ func InitializeServer() *Server {
|
||||||
ctx := services.Context{DB: server.DB, Config: server.Config}
|
ctx := services.Context{DB: server.DB, Config: server.Config}
|
||||||
|
|
||||||
server.Services = services.InitializeServices(&ctx)
|
server.Services = services.InitializeServices(&ctx)
|
||||||
server.Integrations = ctx.Integrations
|
|
||||||
|
|
||||||
return &server
|
return &server
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +0,0 @@
|
||||||
ALTER TABLE sensors ADD COLUMN type TEXT;
|
|
||||||
ALTER TABLE sensors ADD COLUMN mqtt_broker_id INTEGER;
|
|
||||||
ALTER TABLE sensors ADD COLUMN mqtt_topic TEXT;
|
|
||||||
ALTER TABLE sensors ADD COLUMN mqtt_path TEXT;
|
|
||||||
|
|
@ -6,17 +6,6 @@ import (
|
||||||
|
|
||||||
type MQTTIntegration struct{}
|
type MQTTIntegration struct{}
|
||||||
|
|
||||||
type MQTTEvent struct {
|
|
||||||
Type string
|
|
||||||
Topic string
|
|
||||||
Payload []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type MQTTListeningClient struct {
|
|
||||||
Client MQTT.Client
|
|
||||||
Channel chan MQTTEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error {
|
func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error {
|
||||||
opts := MQTT.NewClientOptions()
|
opts := MQTT.NewClientOptions()
|
||||||
opts.AddBroker(server)
|
opts.AddBroker(server)
|
||||||
|
|
@ -36,43 +25,3 @@ func (s *MQTTIntegration) Publish(server string, username string, password strin
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) {
|
|
||||||
channel := make(chan MQTTEvent)
|
|
||||||
|
|
||||||
opts := MQTT.NewClientOptions()
|
|
||||||
opts.AddBroker(server)
|
|
||||||
opts.SetUsername(username)
|
|
||||||
opts.SetPassword(password)
|
|
||||||
opts.SetClientID(clientId)
|
|
||||||
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
|
|
||||||
channel <- MQTTEvent{
|
|
||||||
Type: "message",
|
|
||||||
Topic: msg.Topic(),
|
|
||||||
Payload: msg.Payload(),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
client := MQTT.NewClient(opts)
|
|
||||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
||||||
return nil, token.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if token := client.SubscribeMultiple(topics, nil); token.Wait() && token.Error() != nil {
|
|
||||||
return nil, token.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
return &MQTTListeningClient{
|
|
||||||
Client: client,
|
|
||||||
Channel: channel,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *MQTTListeningClient) Close() {
|
|
||||||
c.Channel <- MQTTEvent{
|
|
||||||
Type: "close",
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Client.Disconnect(250)
|
|
||||||
close(c.Channel)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -90,9 +90,6 @@ func main() {
|
||||||
// Starts alerts handling goroutine
|
// Starts alerts handling goroutine
|
||||||
server.StartAlerts()
|
server.StartAlerts()
|
||||||
|
|
||||||
// MQTT listeners
|
|
||||||
server.EnsureMqttListeners()
|
|
||||||
|
|
||||||
// Graceful shutdown using SIGTERM or SIGINT
|
// Graceful shutdown using SIGTERM or SIGINT
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
@ -119,8 +116,6 @@ func main() {
|
||||||
// Shutdown the server
|
// Shutdown the server
|
||||||
log.Println("Shutting down server...")
|
log.Println("Shutting down server...")
|
||||||
|
|
||||||
server.Services.MQTTBrokers.StopListeners()
|
|
||||||
|
|
||||||
if err := srv.Shutdown(context.TODO()); err != nil {
|
if err := srv.Shutdown(context.TODO()); err != nil {
|
||||||
log.Fatalf("Server shutdown failed: %v", err)
|
log.Fatalf("Server shutdown failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,6 @@ package models
|
||||||
type SensorItem struct {
|
type SensorItem struct {
|
||||||
Id int64 `json:"id" db:"id"`
|
Id int64 `json:"id" db:"id"`
|
||||||
Name string `json:"name" db:"name"`
|
Name string `json:"name" db:"name"`
|
||||||
Type *string `json:"type" db:"type"`
|
|
||||||
MqttBrokerId *int `json:"mqttBrokerId" db:"mqtt_broker_id"`
|
|
||||||
MqttTopic *string `json:"mqttTopic" db:"mqtt_topic"`
|
|
||||||
MqttPath *string `json:"mqttPath" db:"mqtt_path"`
|
|
||||||
AuthKey string `json:"authKey" db:"auth_key"`
|
AuthKey string `json:"authKey" db:"auth_key"`
|
||||||
LastContactAt *int64 `json:"lastContactAt" db:"last_contact_at"`
|
LastContactAt *int64 `json:"lastContactAt" db:"last_contact_at"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,6 @@ import (
|
||||||
|
|
||||||
type postOrPutSensorsBody struct {
|
type postOrPutSensorsBody struct {
|
||||||
Name string `json:"name" binding:"required"`
|
Name string `json:"name" binding:"required"`
|
||||||
Type *string `json:"type"`
|
|
||||||
MqttBrokerId *int `json:"mqttBrokerId"`
|
|
||||||
MqttTopic *string `json:"mqttTopic"`
|
|
||||||
MqttPath *string `json:"mqttPath"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSensors(s *app.Server) gin.HandlerFunc {
|
func GetSensors(s *app.Server) gin.HandlerFunc {
|
||||||
|
|
@ -37,7 +33,7 @@ func PostSensors(s *app.Server) gin.HandlerFunc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sensor, err := s.Services.Sensors.Create(body.Name, body.Type, body.MqttBrokerId, body.MqttTopic, body.MqttPath)
|
sensor, err := s.Services.Sensors.Create(body.Name)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.AbortWithError(http.StatusInternalServerError, err)
|
c.AbortWithError(http.StatusInternalServerError, err)
|
||||||
|
|
@ -60,7 +56,7 @@ func PutSensor(s *app.Server) gin.HandlerFunc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sensor, err := s.Services.Sensors.Update(sensorId, body.Name, body.Type, body.MqttBrokerId, body.MqttTopic, body.MqttPath)
|
sensor, err := s.Services.Sensors.Update(sensorId, body.Name)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.AbortWithError(http.StatusInternalServerError, err)
|
c.AbortWithError(http.StatusInternalServerError, err)
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,9 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
import (
|
import "basic-sensor-receiver/models"
|
||||||
"basic-sensor-receiver/integrations"
|
|
||||||
"basic-sensor-receiver/models"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MQTTBrokersService struct {
|
type MQTTBrokersService struct {
|
||||||
ctx *Context
|
ctx *Context
|
||||||
|
|
||||||
MqttWaitGroup *sync.WaitGroup
|
|
||||||
MqttClients []*integrations.MQTTListeningClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MQTTBrokersService) GetList() ([]models.MQTTBrokerItem, error) {
|
func (s *MQTTBrokersService) GetList() ([]models.MQTTBrokerItem, error) {
|
||||||
|
|
@ -79,8 +69,6 @@ func (s *MQTTBrokersService) Create(name string, address string, username *strin
|
||||||
|
|
||||||
broker.Id = int(id)
|
broker.Id = int(id)
|
||||||
|
|
||||||
s.EnsureListeners()
|
|
||||||
|
|
||||||
return &broker, nil
|
return &broker, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -107,8 +95,6 @@ func (s *MQTTBrokersService) Update(id int64, name string, address string, usern
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.EnsureListeners()
|
|
||||||
|
|
||||||
return &broker, nil
|
return &broker, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -125,8 +111,6 @@ func (s *MQTTBrokersService) Delete(id int64) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.EnsureListeners()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -162,154 +146,3 @@ func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message
|
||||||
message,
|
message,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MQTTBrokersService) StopListeners() {
|
|
||||||
if s.MqttWaitGroup != nil {
|
|
||||||
for _, client := range s.MqttClients {
|
|
||||||
client.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
s.MqttWaitGroup.Wait()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MQTTBrokersService) EnsureListeners() {
|
|
||||||
if s.MqttWaitGroup != nil {
|
|
||||||
// Disconnect all existing clients
|
|
||||||
for _, client := range s.MqttClients {
|
|
||||||
client.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all channels to clear
|
|
||||||
s.MqttWaitGroup.Wait()
|
|
||||||
|
|
||||||
// Clear data
|
|
||||||
s.MqttWaitGroup = nil
|
|
||||||
s.MqttClients = []*integrations.MQTTListeningClient{}
|
|
||||||
}
|
|
||||||
|
|
||||||
brokers, err := s.GetList()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Println(fmt.Errorf("error getting MQTT brokers: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sensors, err := s.ctx.Services.Sensors.GetList()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Println(fmt.Errorf("error getting sensors: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This needs to be refreshed when a new broker is added or a broker is updated
|
|
||||||
// TODO: Save all the coroutines and stop them when the server is stopped
|
|
||||||
|
|
||||||
s.MqttWaitGroup = &sync.WaitGroup{}
|
|
||||||
|
|
||||||
for _, broker := range brokers {
|
|
||||||
if broker.Username == nil {
|
|
||||||
broker.Username = new(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
if broker.Password == nil {
|
|
||||||
broker.Password = new(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
if broker.ClientId == nil {
|
|
||||||
broker.ClientId = new(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
topics := map[string]byte{}
|
|
||||||
brokerSensors := []*models.SensorItem{}
|
|
||||||
|
|
||||||
for _, sensor := range sensors {
|
|
||||||
if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if sensor.MqttTopic == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
topics[*sensor.MqttTopic] = byte(0)
|
|
||||||
brokerSensors = append(brokerSensors, &sensor)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(brokerSensors) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := s.ctx.Integrations.MQTT.Listen(broker.Address, *broker.Username, *broker.Password, *broker.ClientId, topics)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Println(fmt.Errorf("error listening to MQTT broker %s: %s", broker.Name, err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.MqttClients = append(s.MqttClients, client)
|
|
||||||
s.MqttWaitGroup.Add(1)
|
|
||||||
|
|
||||||
log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics))
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
data, ok := <-client.Channel
|
|
||||||
if !ok {
|
|
||||||
log.Println("WARN: MQTT channel closed unexpectedly")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if data.Type == "close" {
|
|
||||||
log.Println("MQTT channel closed by request")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
switch data.Type {
|
|
||||||
case "message":
|
|
||||||
{
|
|
||||||
log.Printf("MQTT Message: Topic: %s Message: %s\n", data.Topic, string(data.Payload))
|
|
||||||
|
|
||||||
// TODO: This is sub-optional
|
|
||||||
for _, sensor := range brokerSensors {
|
|
||||||
if sensor.MqttTopic == nil || sensor.MqttPath == nil {
|
|
||||||
log.Printf("WARN: Skipping sensor %s because it's missing topic or value path", sensor.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if *sensor.MqttTopic != data.Topic {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var responseJson map[string]any
|
|
||||||
err := json.Unmarshal(data.Payload, &responseJson)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Print(fmt.Errorf("error parsing MQTT message JSON: %s", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, exists := responseJson[*sensor.MqttPath]; !exists {
|
|
||||||
log.Printf("WARN: Skipping sensor %s because it's value path %s was not found in the message", sensor.Name, *sensor.MqttPath)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
value, ok := responseJson[*sensor.MqttPath].(float64)
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
log.Printf("WARN: Skipping sensor %s because value path %s doesn't resolve to number", sensor.Name, *sensor.MqttPath)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.ctx.Services.SensorValues.Push(sensor.Id, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.MqttWaitGroup.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ type SensorsService struct {
|
||||||
func (s *SensorsService) GetList() ([]models.SensorItem, error) {
|
func (s *SensorsService) GetList() ([]models.SensorItem, error) {
|
||||||
sensors := []models.SensorItem{}
|
sensors := []models.SensorItem{}
|
||||||
|
|
||||||
err := s.ctx.DB.Select(&sensors, "SELECT id, name, type, mqtt_broker_id, mqtt_topic, mqtt_path, auth_key, last_contact_at FROM sensors")
|
err := s.ctx.DB.Select(&sensors, "SELECT id, name, auth_key, last_contact_at FROM sensors")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -22,28 +22,19 @@ func (s *SensorsService) GetList() ([]models.SensorItem, error) {
|
||||||
return sensors, nil
|
return sensors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SensorsService) Create(name string, sensorType *string, mqttBrokerId *int, mqttTopic *string, mqttPath *string) (*models.SensorItem, error) {
|
func (s *SensorsService) Create(name string) (*models.SensorItem, error) {
|
||||||
authKey, err := generateRandomString(32)
|
authKey, err := generateRandomString(32)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if sensorType == nil {
|
|
||||||
sensorType = new(string)
|
|
||||||
*sensorType = "rest"
|
|
||||||
}
|
|
||||||
|
|
||||||
item := models.SensorItem{
|
item := models.SensorItem{
|
||||||
Name: name,
|
Name: name,
|
||||||
AuthKey: authKey,
|
AuthKey: authKey,
|
||||||
Type: sensorType,
|
|
||||||
MqttBrokerId: mqttBrokerId,
|
|
||||||
MqttTopic: mqttTopic,
|
|
||||||
MqttPath: mqttPath,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := s.ctx.DB.Exec("INSERT INTO sensors (name, auth_key, type, mqtt_broker_id, mqtt_topic, mqtt_path) VALUES (?, ?, ?, ?, ?, ?)", item.Name, item.AuthKey, item.Type, item.MqttBrokerId, item.MqttTopic, item.MqttPath)
|
res, err := s.ctx.DB.Exec("INSERT INTO sensors (name, auth_key) VALUES (?, ?)", item.Name, item.AuthKey)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -55,15 +46,13 @@ func (s *SensorsService) Create(name string, sensorType *string, mqttBrokerId *i
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.ctx.Services.MQTTBrokers.EnsureListeners()
|
|
||||||
|
|
||||||
return &item, nil
|
return &item, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SensorsService) GetById(id int64) (*models.SensorItem, error) {
|
func (s *SensorsService) GetById(id int64) (*models.SensorItem, error) {
|
||||||
item := models.SensorItem{}
|
item := models.SensorItem{}
|
||||||
|
|
||||||
err := s.ctx.DB.Get(&item, "SELECT id, name, auth_key, last_contact_at, type, mqtt_broker_id, mqtt_topic, mqtt_path FROM sensors WHERE id = $1", id)
|
err := s.ctx.DB.Get(&item, "SELECT id, name, auth_key, last_contact_at FROM sensors WHERE id = $1", id)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -72,32 +61,21 @@ func (s *SensorsService) GetById(id int64) (*models.SensorItem, error) {
|
||||||
return &item, nil
|
return &item, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SensorsService) Update(id int64, name string, sensorType *string, mqttBrokerId *int, mqttTopic *string, mqttPath *string) (*models.SensorItem, error) {
|
func (s *SensorsService) Update(id int64, name string) (*models.SensorItem, error) {
|
||||||
item, err := s.GetById(id)
|
item, err := s.GetById(id)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if sensorType == nil {
|
|
||||||
sensorType = new(string)
|
|
||||||
*sensorType = "rest"
|
|
||||||
}
|
|
||||||
|
|
||||||
item.Name = name
|
item.Name = name
|
||||||
item.Type = sensorType
|
|
||||||
item.MqttBrokerId = mqttBrokerId
|
|
||||||
item.MqttTopic = mqttTopic
|
|
||||||
item.MqttPath = mqttPath
|
|
||||||
|
|
||||||
_, err = s.ctx.DB.Exec("UPDATE sensors SET name = ?, type = ?, mqtt_broker_id = ?, mqtt_topic = ?, mqtt_path = ? WHERE id = ?", item.Name, item.Type, item.MqttBrokerId, item.MqttTopic, item.MqttPath, item.Id)
|
_, err = s.ctx.DB.Exec("UPDATE sensors SET name = ? WHERE id = ?", item.Name, item.Id)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.ctx.Services.MQTTBrokers.EnsureListeners()
|
|
||||||
|
|
||||||
return item, nil
|
return item, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -116,8 +94,6 @@ func (s *SensorsService) DeleteById(id int64) error {
|
||||||
|
|
||||||
_, err = s.ctx.DB.Exec("DELETE FROM sensors WHERE id = ?", item.Id)
|
_, err = s.ctx.DB.Exec("DELETE FROM sensors WHERE id = ?", item.Id)
|
||||||
|
|
||||||
s.ctx.Services.MQTTBrokers.EnsureListeners()
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue