diff --git a/server/services/mqtt_brokers_service.go b/server/services/mqtt_brokers_service.go index d674bf8..093de8b 100644 --- a/server/services/mqtt_brokers_service.go +++ b/server/services/mqtt_brokers_service.go @@ -166,11 +166,7 @@ func (s *MQTTBrokersService) PublishTopic(brokerId int64, topic string, message func (s *MQTTBrokersService) StopListeners() { if s.MqttWaitGroup != nil { for _, client := range s.MqttClients { - if client.Client == nil { - continue - } - - client.Client.Disconnect(250) + client.Close() } s.MqttWaitGroup.Wait() @@ -181,11 +177,7 @@ func (s *MQTTBrokersService) EnsureListeners() { if s.MqttWaitGroup != nil { // Disconnect all existing clients for _, client := range s.MqttClients { - if client.Client == nil { - continue - } - - client.Client.Disconnect(250) + client.Close() } // Wait for all channels to clear @@ -264,7 +256,7 @@ func (s *MQTTBrokersService) EnsureListeners() { for { data, ok := <-client.Channel if !ok { - log.Println("WARN: MQTT channel closed") + log.Println("WARN: MQTT channel closed unexpectedly") break }