79 lines
1.7 KiB
Go
79 lines
1.7 KiB
Go
package integrations
|
|
|
|
import (
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type MQTTIntegration struct{}
|
|
|
|
type MQTTEvent struct {
|
|
Type string
|
|
Topic string
|
|
Payload []byte
|
|
}
|
|
|
|
type MQTTListeningClient struct {
|
|
Client MQTT.Client
|
|
Channel chan MQTTEvent
|
|
}
|
|
|
|
func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error {
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(server)
|
|
opts.SetUsername(username)
|
|
opts.SetPassword(password)
|
|
opts.SetClientID(clientId)
|
|
|
|
client := MQTT.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
|
|
token := client.Publish(topic, qos, retain, message)
|
|
token.Wait()
|
|
|
|
client.Disconnect(250)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) {
|
|
channel := make(chan MQTTEvent)
|
|
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(server)
|
|
opts.SetUsername(username)
|
|
opts.SetPassword(password)
|
|
opts.SetClientID(clientId)
|
|
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
|
|
channel <- MQTTEvent{
|
|
Type: "message",
|
|
Topic: msg.Topic(),
|
|
Payload: msg.Payload(),
|
|
}
|
|
})
|
|
|
|
client := MQTT.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
return nil, token.Error()
|
|
}
|
|
|
|
if token := client.SubscribeMultiple(topics, nil); token.Wait() && token.Error() != nil {
|
|
return nil, token.Error()
|
|
}
|
|
|
|
return &MQTTListeningClient{
|
|
Client: client,
|
|
Channel: channel,
|
|
}, nil
|
|
}
|
|
|
|
func (c *MQTTListeningClient) Close() {
|
|
c.Channel <- MQTTEvent{
|
|
Type: "close",
|
|
}
|
|
|
|
c.Client.Disconnect(250)
|
|
close(c.Channel)
|
|
}
|