Compare commits
6 Commits
master
...
feature/mq
| Author | SHA1 | Date |
|---|---|---|
|
|
b6d2d115bf | |
|
|
44d57143d0 | |
|
|
bae535a014 | |
|
|
3029d4af52 | |
|
|
9f65112a82 | |
|
|
4f9fcbe7e0 |
|
|
@ -20,8 +20,6 @@ type FormValues = {
|
|||
type: string
|
||||
telegramApiKey?: string
|
||||
telegramTargetChannel?: number
|
||||
ntfyEndpoint?: string
|
||||
ntfyAuthToken?: string
|
||||
}
|
||||
|
||||
export const ContactPointFormModal = ({
|
||||
|
|
@ -47,13 +45,6 @@ export const ContactPointFormModal = ({
|
|||
}
|
||||
}
|
||||
|
||||
if (v.type === 'ntfy') {
|
||||
return {
|
||||
endpoint: v.ntfyEndpoint,
|
||||
authToken: v.ntfyAuthToken,
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
}
|
||||
|
||||
|
|
@ -65,10 +56,6 @@ export const ContactPointFormModal = ({
|
|||
telegramApiKey: parsedTypeConfig.apiKey ?? '',
|
||||
telegramTargetChannel: parsedTypeConfig.targetChannel,
|
||||
}),
|
||||
...(parsedTypeConfig?.type === 'ntfy' && {
|
||||
ntfyEndpoint: parsedTypeConfig.endpoint ?? '',
|
||||
ntfyAuthToken: parsedTypeConfig.authToken ?? '',
|
||||
}),
|
||||
}),
|
||||
onSubmit: async (v) => {
|
||||
if (isLoading) {
|
||||
|
|
@ -118,7 +105,6 @@ export const ContactPointFormModal = ({
|
|||
<label>Type</label>
|
||||
<select required {...register('type')}>
|
||||
<option value="telegram">Telegram</option>
|
||||
<option value="ntfy">Ntfy</option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
|
|
@ -146,26 +132,6 @@ export const ContactPointFormModal = ({
|
|||
</>
|
||||
)}
|
||||
|
||||
{type === 'ntfy' && (
|
||||
<>
|
||||
<div className="input">
|
||||
<label>Endpoint</label>
|
||||
<input
|
||||
type="text"
|
||||
minLength={1}
|
||||
required
|
||||
placeholder="https://ntfy.sh"
|
||||
{...register('ntfyEndpoint')}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="input">
|
||||
<label>Auth Token (optional)</label>
|
||||
<input type="text" {...register('ntfyAuthToken')} />
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
|
||||
<div className="actions">
|
||||
<button
|
||||
className="test"
|
||||
|
|
|
|||
|
|
@ -7,15 +7,7 @@ type ContactPointTelegramConfig = {
|
|||
targetChannel: number
|
||||
}
|
||||
|
||||
type ContactPointNtfyConfig = {
|
||||
type: 'ntfy'
|
||||
endpoint: string
|
||||
authToken?: string
|
||||
}
|
||||
|
||||
export type ParsedContactPointConfig =
|
||||
| ContactPointTelegramConfig
|
||||
| ContactPointNtfyConfig
|
||||
export type ParsedContactPointConfig = ContactPointTelegramConfig
|
||||
|
||||
export const tryParseContactPointConfig = (
|
||||
contactPoint: ContactPointInfo
|
||||
|
|
@ -34,13 +26,5 @@ export const tryParseContactPointConfig = (
|
|||
}
|
||||
}
|
||||
|
||||
if (contactPoint.type === 'ntfy') {
|
||||
return {
|
||||
type: 'ntfy',
|
||||
endpoint: data.endpoint,
|
||||
authToken: data.authToken,
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,20 +6,16 @@ import (
|
|||
)
|
||||
|
||||
func (s *Server) StartAlerts() {
|
||||
ticker := time.NewTicker(time.Second * 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
interval := time.Second * 5
|
||||
startedAt := time.Now()
|
||||
|
||||
err := s.Services.AlertsEvaluator.EvaluateAlerts()
|
||||
if err != nil {
|
||||
fmt.Println("Error evaluating alerts: ", err)
|
||||
}
|
||||
|
||||
elapsed := time.Since(startedAt)
|
||||
if elapsed < interval {
|
||||
time.Sleep(interval - elapsed)
|
||||
}
|
||||
<-ticker.C
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
CREATE INDEX IF NOT EXISTS idx_sensor_values_timestamp ON sensor_values (timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_values_sensor ON sensor_values (sensor_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_values_sensor_timestamp ON sensor_values (sensor_id, timestamp);
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
package integrations
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type NtfyIntegration struct{}
|
||||
|
||||
var ntfyHTTPClient = &http.Client{
|
||||
Timeout: 15 * time.Second,
|
||||
}
|
||||
|
||||
type NtfyNotificationChannelConfig struct {
|
||||
Endpoint string `json:"endpoint"`
|
||||
AuthToken string `json:"authToken"`
|
||||
}
|
||||
|
||||
func (s NtfyIntegration) ProcessEvent(evt *ContactPointEvent, rawConfig string) error {
|
||||
config := NtfyNotificationChannelConfig{}
|
||||
err := json.Unmarshal([]byte(rawConfig), &config)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse ntfy integration config - %w", err)
|
||||
}
|
||||
|
||||
if err := s.validate(config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch evt.Type {
|
||||
case ContactPointEventAlertTriggered:
|
||||
data := evt.AlertTriggeredEvent
|
||||
|
||||
if data.SensorValueCondition != nil {
|
||||
text := fmt.Sprintf("%s is at {value}", data.Sensor.Name)
|
||||
|
||||
if data.Alert.CustomMessage != "" {
|
||||
text = data.Alert.CustomMessage
|
||||
}
|
||||
|
||||
text = strings.Replace(text, "{value}", strconv.FormatFloat(data.LastValue, 'f', -1, 64), -1)
|
||||
|
||||
return s.send(config, "Alert Triggered", text)
|
||||
}
|
||||
|
||||
if data.SensorLastContactCondition != nil {
|
||||
text := fmt.Sprintf(
|
||||
"%s has not reported in for last %s %s",
|
||||
data.Sensor.Name,
|
||||
strconv.FormatFloat(data.SensorLastContactCondition.Value, 'f', -1, 64),
|
||||
data.SensorLastContactCondition.ValueUnit,
|
||||
)
|
||||
|
||||
if data.Alert.CustomMessage != "" {
|
||||
text = data.Alert.CustomMessage
|
||||
}
|
||||
|
||||
return s.send(config, "Alert Triggered", text)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
case ContactPointEventAlertResolved:
|
||||
data := evt.AlertResolvedEvent
|
||||
|
||||
if data.SensorValueCondition != nil {
|
||||
text := fmt.Sprintf("%s is at {value}", data.Sensor.Name)
|
||||
|
||||
if data.Alert.CustomResolvedMessage != "" {
|
||||
text = data.Alert.CustomResolvedMessage
|
||||
}
|
||||
|
||||
text = strings.Replace(text, "{value}", strconv.FormatFloat(data.LastValue, 'f', -1, 64), -1)
|
||||
|
||||
return s.send(config, "Alert Resolved", text)
|
||||
}
|
||||
|
||||
if data.SensorLastContactCondition != nil {
|
||||
text := fmt.Sprintf(
|
||||
"%s has reported in last %s %s",
|
||||
data.Sensor.Name,
|
||||
strconv.FormatFloat(data.SensorLastContactCondition.Value, 'f', -1, 64),
|
||||
data.SensorLastContactCondition.ValueUnit,
|
||||
)
|
||||
|
||||
if data.Alert.CustomResolvedMessage != "" {
|
||||
text = data.Alert.CustomResolvedMessage
|
||||
}
|
||||
|
||||
return s.send(config, "Alert Resolved", text)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
case ContactPointEventTest:
|
||||
return s.send(config, "Test", "Test message from Basic Sensor Receiver")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s NtfyIntegration) ValidateConfig(rawConfig string) error {
|
||||
config := NtfyNotificationChannelConfig{}
|
||||
err := json.Unmarshal([]byte(rawConfig), &config)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.validate(config)
|
||||
}
|
||||
|
||||
func (s NtfyIntegration) validate(config NtfyNotificationChannelConfig) error {
|
||||
if strings.TrimSpace(config.Endpoint) == "" {
|
||||
return fmt.Errorf("ntfy endpoint is required")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s NtfyIntegration) send(config NtfyNotificationChannelConfig, title string, text string) error {
|
||||
url := strings.TrimSpace(config.Endpoint)
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(text))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create ntfy request - %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
req.Header.Set("Title", title)
|
||||
|
||||
if config.AuthToken != "" {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AuthToken))
|
||||
}
|
||||
|
||||
resp, err := ntfyHTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send ntfy notification - %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
body, readErr := io.ReadAll(resp.Body)
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("ntfy returned status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return fmt.Errorf("ntfy returned status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -9,12 +9,12 @@ import (
|
|||
|
||||
type postOrPutContactPointsBody struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Type string `json:"type" binding:"required,oneof=telegram ntfy"`
|
||||
Type string `json:"type" binding:"required,oneof=telegram"`
|
||||
TypeConfig string `json:"typeConfig" binding:"required"`
|
||||
}
|
||||
|
||||
type testContactPointBody struct {
|
||||
Type string `json:"type" binding:"required,oneof=telegram ntfy"`
|
||||
Type string `json:"type" binding:"required,oneof=telegram"`
|
||||
TypeConfig string `json:"typeConfig" binding:"required"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func unitToSeconds(unit string) float64 {
|
|||
}
|
||||
|
||||
func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
|
||||
condition := map[string]any{}
|
||||
condition := map[string]interface{}{}
|
||||
err := json.Unmarshal([]byte(alert.Condition), &condition)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -86,7 +86,7 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
|
|||
|
||||
sensorId = int64(sensorValueCondition.SensorId)
|
||||
|
||||
value, err := s.ctx.Services.SensorValues.GetLatestToNow(sensorValueCondition.SensorId)
|
||||
value, err := s.ctx.Services.SensorValues.GetLatest(sensorValueCondition.SensorId, time.Now().Unix())
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
lastValue = float64(0)
|
||||
|
|
@ -112,7 +112,7 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
|
|||
|
||||
sensorId = sensorLastContactCondition.SensorId
|
||||
|
||||
value, err := s.ctx.Services.SensorValues.GetLatestToNow(sensorLastContactCondition.SensorId)
|
||||
value, err := s.ctx.Services.SensorValues.GetLatest(sensorLastContactCondition.SensorId, time.Now().Unix())
|
||||
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -157,11 +157,8 @@ func (s *AlertsEvaluatorService) EvaluateAlert(alert *models.AlertItem) error {
|
|||
}
|
||||
}
|
||||
|
||||
if newStatus != alert.LastStatus {
|
||||
_, err := s.ctx.DB.Exec("UPDATE alerts SET last_status = ?, last_status_at = ? WHERE id = ?", newStatus, time.Now().Unix(), alert.Id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error updating alert status: %v", err)
|
||||
}
|
||||
if newStatus != alert.LastStatus || newStatus == models.AlertStatusOk {
|
||||
s.ctx.DB.Exec("UPDATE alerts SET last_status = ?, last_status_at = ? WHERE id = ?", newStatus, time.Now().Unix(), alert.Id)
|
||||
|
||||
if newStatus == models.AlertStatusAlerting || newStatus == models.AlertStatusOk {
|
||||
sensor, err := s.ctx.Services.Sensors.GetById(sensorId)
|
||||
|
|
|
|||
|
|
@ -62,14 +62,6 @@ func (s *ContactPointsService) Create(name string, contactType string, contactTy
|
|||
return nil, fmt.Errorf("failed to validate telegram config - %w", err)
|
||||
}
|
||||
}
|
||||
case "ntfy":
|
||||
{
|
||||
err := s.ctx.Integrations.Ntfy.ValidateConfig(contactTypeConfig)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to validate ntfy config - %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
item := ContactPointItem{
|
||||
|
|
@ -152,8 +144,6 @@ func (channel *ContactPointItem) getService(ctx *Context) (integrations.ContactP
|
|||
switch channel.Type {
|
||||
case "telegram":
|
||||
return ctx.Integrations.Telegram, nil
|
||||
case "ntfy":
|
||||
return ctx.Integrations.Ntfy, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("unknown channel type")
|
||||
|
|
@ -163,8 +153,6 @@ func (channel *ContactPointItem) getTestService(ctx *Context) (integrations.Cont
|
|||
switch channel.Type {
|
||||
case "telegram":
|
||||
return ctx.Integrations.Telegram, nil
|
||||
case "ntfy":
|
||||
return ctx.Integrations.Ntfy, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("unknown channel type")
|
||||
|
|
|
|||
|
|
@ -218,7 +218,7 @@ func (s *MQTTBrokersService) EnsureListeners() {
|
|||
}
|
||||
|
||||
topics := map[string]byte{}
|
||||
brokerSensors := []models.SensorItem{}
|
||||
brokerSensors := []*models.SensorItem{}
|
||||
|
||||
for _, sensor := range sensors {
|
||||
if sensor.MqttBrokerId == nil || *sensor.MqttBrokerId != broker.Id {
|
||||
|
|
@ -230,7 +230,7 @@ func (s *MQTTBrokersService) EnsureListeners() {
|
|||
}
|
||||
|
||||
topics[*sensor.MqttTopic] = byte(0)
|
||||
brokerSensors = append(brokerSensors, sensor)
|
||||
brokerSensors = append(brokerSensors, &sensor)
|
||||
}
|
||||
|
||||
if len(brokerSensors) == 0 {
|
||||
|
|
@ -249,10 +249,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
|
|||
|
||||
log.Printf("MQTT broker %s: Listening for %d topics\n", broker.Name, len(topics))
|
||||
|
||||
for _, sensor := range brokerSensors {
|
||||
log.Printf("MQTT broker %s: Sensor %s (%d) - %s\n", broker.Name, sensor.Name, sensor.Id, *sensor.MqttTopic)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
data, ok := <-client.Channel
|
||||
|
|
@ -279,7 +275,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
|
|||
}
|
||||
|
||||
if *sensor.MqttTopic != data.Topic {
|
||||
log.Printf("WARN: Skipping sensor %s because it has different topic", sensor.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -303,8 +298,6 @@ func (s *MQTTBrokersService) EnsureListeners() {
|
|||
continue
|
||||
}
|
||||
|
||||
log.Printf("Value for sensor %d (%s) - %f", sensor.Id, sensor.Name, value)
|
||||
|
||||
s.ctx.Services.SensorValues.Push(sensor.Id, value)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -48,19 +48,6 @@ func (s *SensorValuesService) GetLatest(sensorId int64, to int64) (*sensorValue,
|
|||
return &value, nil
|
||||
}
|
||||
|
||||
func (s *SensorValuesService) GetLatestToNow(sensorId int64) (*sensorValue, error) {
|
||||
var value = sensorValue{}
|
||||
|
||||
row := s.ctx.DB.QueryRow("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? ORDER BY timestamp DESC LIMIT 1", sensorId)
|
||||
|
||||
err := row.Scan(&value.Timestamp, &value.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &value, nil
|
||||
}
|
||||
|
||||
func (s *SensorValuesService) Cleanup(retentionInDays int64) error {
|
||||
if retentionInDays <= 0 {
|
||||
return fmt.Errorf("retentionInDays must be greater than 0")
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ type Services struct {
|
|||
|
||||
type Integrations struct {
|
||||
Telegram *integrations.TelegramIntegration
|
||||
Ntfy *integrations.NtfyIntegration
|
||||
MQTT *integrations.MQTTIntegration
|
||||
}
|
||||
|
||||
|
|
@ -51,7 +50,6 @@ func InitializeServices(ctx *Context) *Services {
|
|||
|
||||
ctx.Integrations = &Integrations{}
|
||||
ctx.Integrations.Telegram = &integrations.TelegramIntegration{}
|
||||
ctx.Integrations.Ntfy = &integrations.NtfyIntegration{}
|
||||
ctx.Integrations.MQTT = &integrations.MQTTIntegration{}
|
||||
|
||||
return &services
|
||||
|
|
|
|||
Loading…
Reference in New Issue