Compare commits

..

6 Commits

Author SHA1 Message Date
Jan Zípek b6d2d115bf
TODOs are complete 2025-03-05 11:09:33 +01:00
Jan Zípek 44d57143d0
Fix closing 2025-03-05 11:05:24 +01:00
Jan Zípek bae535a014
Refactoring 2025-03-05 11:00:51 +01:00
Jan Zípek 3029d4af52
Improved error messages 2025-03-04 23:29:48 +01:00
Jan Zípek 9f65112a82
Fix missing value field 2025-03-04 21:31:42 +01:00
Jan Zípek 4f9fcbe7e0
MQTT sensors implemented, refactoring needed 2025-03-04 12:09:15 +01:00
12 changed files with 13 additions and 265 deletions

View File

@ -20,8 +20,6 @@ type FormValues = {
type: string
telegramApiKey?: string
telegramTargetChannel?: number
ntfyEndpoint?: string
ntfyAuthToken?: string
}
export const ContactPointFormModal = ({
@ -47,13 +45,6 @@ export const ContactPointFormModal = ({
}
}
if (v.type === 'ntfy') {
return {
endpoint: v.ntfyEndpoint,
authToken: v.ntfyAuthToken,
}
}
return {}
}
@ -65,10 +56,6 @@ export const ContactPointFormModal = ({
telegramApiKey: parsedTypeConfig.apiKey ?? '',
telegramTargetChannel: parsedTypeConfig.targetChannel,
}),
...(parsedTypeConfig?.type === 'ntfy' && {
ntfyEndpoint: parsedTypeConfig.endpoint ?? '',
ntfyAuthToken: parsedTypeConfig.authToken ?? '',
}),
}),
onSubmit: async (v) => {
if (isLoading) {
@ -118,7 +105,6 @@ export const ContactPointFormModal = ({
<label>Type</label>
<select required {...register('type')}>
<option value="telegram">Telegram</option>
<option value="ntfy">Ntfy</option>
</select>
</div>
@ -146,26 +132,6 @@ export const ContactPointFormModal = ({
</>
)}
{type === 'ntfy' && (
<>
<div className="input">
<label>Endpoint</label>
<input
type="text"
minLength={1}
required
placeholder="https://ntfy.sh"
{...register('ntfyEndpoint')}
/>
</div>
<div className="input">
<label>Auth Token (optional)</label>
<input type="text" {...register('ntfyAuthToken')} />
</div>
</>
)}
<div className="actions">
<button
className="test"

View File

@ -7,15 +7,7 @@ type ContactPointTelegramConfig = {
targetChannel: number
}
type ContactPointNtfyConfig = {
type: 'ntfy'
endpoint: string
authToken?: string
}
export type ParsedContactPointConfig =
| ContactPointTelegramConfig
| ContactPointNtfyConfig
export type ParsedContactPointConfig = ContactPointTelegramConfig
export const tryParseContactPointConfig = (
contactPoint: ContactPointInfo
@ -34,13 +26,5 @@ export const tryParseContactPointConfig = (
}
}
if (contactPoint.type === 'ntfy') {
return {
type: 'ntfy',
endpoint: data.endpoint,
authToken: data.authToken,
}
}
return null
}

View File

@ -6,20 +6,16 @@ import (
)
func (s *Server) StartAlerts() {
ticker := time.NewTicker(time.Second * 1)
go func() {
for {
interval := time.Second * 5
startedAt := time.Now()
err := s.Services.AlertsEvaluator.EvaluateAlerts()
if err != nil {
fmt.Println("Error evaluating alerts: ", err)
}
elapsed := time.Since(startedAt)
if elapsed < interval {
time.Sleep(interval - elapsed)
}
<-ticker.C
}
}()
}

View File

@ -1,3 +0,0 @@
CREATE INDEX IF NOT EXISTS idx_sensor_values_timestamp ON sensor_values (timestamp);
CREATE INDEX IF NOT EXISTS idx_sensor_values_sensor ON sensor_values (sensor_id);
CREATE INDEX IF NOT EXISTS idx_sensor_values_sensor_timestamp ON sensor_values (sensor_id, timestamp);

View File

@ -1,158 +0,0 @@
package integrations
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
)
type NtfyIntegration struct{}
var ntfyHTTPClient = &http.Client{
Timeout: 15 * time.Second,
}
type NtfyNotificationChannelConfig struct {
Endpoint string `json:"endpoint"`
AuthToken string `json:"authToken"`
}
func (s NtfyIntegration) ProcessEvent(evt *ContactPointEvent, rawConfig string) error {
config := NtfyNotificationChannelConfig{}
err := json.Unmarshal([]byte(rawConfig), &config)
if err != nil {
return fmt.Errorf("failed to parse ntfy integration config - %w", err)
}
if err := s.validate(config); err != nil {
return err
}
switch evt.Type {
case ContactPointEventAlertTriggered:
data := evt.AlertTriggeredEvent
if data.SensorValueCondition != nil {
text := fmt.Sprintf("%s is at {value}", data.Sensor.Name)
if data.Alert.CustomMessage != "" {
text = data.Alert.CustomMessage
}
text = strings.Replace(text, "{value}", strconv.FormatFloat(data.LastValue, 'f', -1, 64), -1)
return s.send(config, "Alert Triggered", text)
}
if data.SensorLastContactCondition != nil {
text := fmt.Sprintf(
"%s has not reported in for last %s %s",
data.Sensor.Name,
strconv.FormatFloat(data.SensorLastContactCondition.Value, 'f', -1, 64),
data.SensorLastContactCondition.ValueUnit,
)
if data.Alert.CustomMessage != "" {
text = data.Alert.CustomMessage
}
return s.send(config, "Alert Triggered", text)
}
return nil
case ContactPointEventAlertResolved:
data := evt.AlertResolvedEvent
if data.SensorValueCondition != nil {
text := fmt.Sprintf("%s is at {value}", data.Sensor.Name)
if data.Alert.CustomResolvedMessage != "" {
text = data.Alert.CustomResolvedMessage
}
text = strings.Replace(text, "{value}", strconv.FormatFloat(data.LastValue, 'f', -1, 64), -1)
return s.send(config, "Alert Resolved", text)
}
if data.SensorLastContactCondition != nil {
text := fmt.Sprintf(
"%s has reported in last %s %s",
data.Sensor.Name,
strconv.FormatFloat(data.SensorLastContactCondition.Value, 'f', -1, 64),
data.SensorLastContactCondition.ValueUnit,
)
if data.Alert.CustomResolvedMessage != "" {
text = data.Alert.CustomResolvedMessage
}
return s.send(config, "Alert Resolved", text)
}
return nil
case ContactPointEventTest:
return s.send(config, "Test", "Test message from Basic Sensor Receiver")
}
return nil
}
func (s NtfyIntegration) ValidateConfig(rawConfig string) error {
config := NtfyNotificationChannelConfig{}
err := json.Unmarshal([]byte(rawConfig), &config)
if err != nil {
return err
}
return s.validate(config)
}
func (s NtfyIntegration) validate(config NtfyNotificationChannelConfig) error {
if strings.TrimSpace(config.Endpoint) == "" {
return fmt.Errorf("ntfy endpoint is required")
}
return nil
}
func (s NtfyIntegration) send(config NtfyNotificationChannelConfig, title string, text string) error {
url := strings.TrimSpace(config.Endpoint)
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(text))
if err != nil {
return fmt.Errorf("failed to create ntfy request - %w", err)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Title", title)
if config.AuthToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AuthToken))
}
resp, err := ntfyHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send ntfy notification - %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("ntfy returned status %d", resp.StatusCode)
}
return fmt.Errorf("ntfy returned status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
return nil
}

View File

@ -9,12 +9,12 @@ import (
type postOrPutContactPointsBody struct {
Name string `json:"name" binding:"required"`
Type string `json:"type" binding:"required,oneof=telegram ntfy"`
Type string `json:"type" binding:"required,oneof=telegram"`
TypeConfig string `json:"typeConfig" binding:"required"`
}
type testContactPointBody struct {
Type string `json:"type" binding:"required,oneof=telegram ntfy"`
Type string `json:"type" binding:"required,oneof=telegram"`
TypeConfig string `json:"typeConfig" binding:"required"`
}

View File

@ -61,7 +61,7 @@ func unitToSeconds(unit string) float64 {
}
func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
condition := map[string]any{}
condition := map[string]interface{}{}
err := json.Unmarshal([]byte(alert.Condition), &condition)
if err != nil {
return err
@ -86,7 +86,7 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
sensorId = int64(sensorValueCondition.SensorId)
value, err := s.ctx.Services.SensorValues.GetLatestToNow(sensorValueCondition.SensorId)
value, err := s.ctx.Services.SensorValues.GetLatest(sensorValueCondition.SensorId, time.Now().Unix())
if err != nil {
if err == sql.ErrNoRows {
lastValue = float64(0)
@ -112,7 +112,7 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
sensorId = sensorLastContactCondition.SensorId
value, err := s.ctx.Services.SensorValues.GetLatestToNow(sensorLastContactCondition.SensorId)
value, err := s.ctx.Services.SensorValues.GetLatest(sensorLastContactCondition.SensorId, time.Now().Unix())
if err != nil {
if err == sql.ErrNoRows {
@ -157,11 +157,8 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
}
}
if newStatus != alert.LastStatus {
_, err := s.ctx.DB.Exec("UPDATE alerts SET last_status = ?, last_status_at = ? WHERE id = ?", newStatus, time.Now().Unix(), alert.Id)
if err != nil {
return fmt.Errorf("error updating alert status: %v", err)
}
if newStatus != alert.LastStatus || newStatus == models.AlertStatusOk {
s.ctx.DB.Exec("UPDATE alerts SET last_status = ?, last_status_at = ? WHERE id = ?", newStatus, time.Now().Unix(), alert.Id)
if newStatus == models.AlertStatusAlerting || newStatus == models.AlertStatusOk {
sensor, err := s.ctx.Services.Sensors.GetById(sensorId)

View File

@ -62,14 +62,6 @@ func (s *ContactPointsService) Create(name string, contactType string, contactTy
return nil, fmt.Errorf("failed to validate telegram config - %w", err)
}
}
case "ntfy":
{
err := s.ctx.Integrations.Ntfy.ValidateConfig(contactTypeConfig)
if err != nil {
return nil, fmt.Errorf("failed to validate ntfy config - %w", err)
}
}
}
item := ContactPointItem{
@ -152,8 +144,6 @@ func (channel *ContactPointItem) getService(ctx *Context) (integrations.ContactP
switch channel.Type {
case "telegram":
return ctx.Integrations.Telegram, nil
case "ntfy":
return ctx.Integrations.Ntfy, nil
}
return nil, errors.New("unknown channel type")
@ -163,8 +153,6 @@ func (channel *ContactPointItem) getTestService(ctx *Context) (integrations.Cont
switch channel.Type {
case "telegram":
return ctx.Integrations.Telegram, nil
case "ntfy":
return ctx.Integrations.Ntfy, nil
}
return nil, errors.New("unknown channel type")

View File

@ -218,7 +218,7 @@ func (s *MQTTBrokersService) EnsureListeners() {
}
topics := map[string]byte{}
brokerSensors := []models.SensorItem{}
brokerSensors := []*models.SensorItem{}
for _, sensor := range sensors {
if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id {
@ -230,7 +230,7 @@ func (s *MQTTBrokersService) EnsureListeners() {
}
topics[*sensor.MqttTopic] = byte(0)
brokerSensors = append(brokerSensors, sensor)
brokerSensors = append(brokerSensors, &sensor)
}
if len(brokerSensors) == 0 {
@ -249,10 +249,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics))
for _, sensor := range brokerSensors {
log.Printf("MQTT broker %s: Sensor %s (%d) - %s\n", broker.Name, sensor.Name, sensor.Id, *sensor.MqttTopic)
}
go func() {
for {
data, ok := <-client.Channel
@ -279,7 +275,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
}
if *sensor.MqttTopic != data.Topic {
log.Printf("WARN: Skipping sensor %s because it has different topic", sensor.Name)
continue
}
@ -303,8 +298,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
continue
}
log.Printf("Value for sensor %d (%s) - %f", sensor.Id, sensor.Name, value)
s.ctx.Services.SensorValues.Push(sensor.Id, value)
}

View File

@ -48,19 +48,6 @@ func (s *SensorValuesService) GetLatest(sensorId int64, to int64) (*sensorValue,
return &value, nil
}
func (s *SensorValuesService) GetLatestToNow(sensorId int64) (*sensorValue, error) {
var value = sensorValue{}
row := s.ctx.DB.QueryRow("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? ORDER BY timestamp DESC LIMIT 1", sensorId)
err := row.Scan(&value.Timestamp, &value.Value)
if err != nil {
return nil, err
}
return &value, nil
}
func (s *SensorValuesService) Cleanup(retentionInDays int64) error {
if retentionInDays <= 0 {
return fmt.Errorf("retentionInDays must be greater than 0")

View File

@ -22,7 +22,6 @@ type Services struct {
type Integrations struct {
Telegram *integrations.TelegramIntegration
Ntfy *integrations.NtfyIntegration
MQTT *integrations.MQTTIntegration
}
@ -51,7 +50,6 @@ func InitializeServices(ctx *Context) *Services {
ctx.Integrations = &Integrations{}
ctx.Integrations.Telegram = &integrations.TelegramIntegration{}
ctx.Integrations.Ntfy = &integrations.NtfyIntegration{}
ctx.Integrations.MQTT = &integrations.MQTTIntegration{}
return &services