package integrations import ( MQTT "github.com/eclipse/paho.mqtt.golang" ) type MQTTIntegration struct{} type MQTTEvent struct { Type string Topic string Payload []byte } type MQTTListeningClient struct { Client MQTT.Client Channel chan MQTTEvent } func (s *MQTTIntegration) Publish(server string, username string, password string, clientId string, retain bool, qos byte, topic string, message string) error { opts := MQTT.NewClientOptions() opts.AddBroker(server) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientId) client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } token := client.Publish(topic, qos, retain, message) token.Wait() client.Disconnect(250) return nil } func (s *MQTTIntegration) Listen(server string, username string, password string, clientId string, topics map[string]byte) (*MQTTListeningClient, error) { channel := make(chan MQTTEvent) opts := MQTT.NewClientOptions() opts.AddBroker(server) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientId) opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { channel <- MQTTEvent{ Type: "message", Topic: msg.Topic(), Payload: msg.Payload(), } }) client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return nil, token.Error() } if token := client.SubscribeMultiple(topics, nil); token.Wait() && token.Error() != nil { return nil, token.Error() } return &MQTTListeningClient{ Client: client, Channel: channel, }, nil } func (c *MQTTListeningClient) Close() { c.Channel <- MQTTEvent{ Type: "close", } c.Client.Disconnect(250) close(c.Channel) }