From 5b8ab0b3fc21ddbf31639513daa8de3b0a216661 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 29 Jan 2014 22:19:56 +0000 Subject: [PATCH] [452672] Fix message handling after reconnecting. Thanks to William Boerendans. Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=452672 Change-Id: I96e4bb0b5fa7e040605ff38db40550bcc68d20e9 --- ChangeLog.txt | 2 ++ src/paho/mqtt/client.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index b805fb4..2af8bb3 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -6,6 +6,8 @@ v1.1 - 2015-01-30 v3.1.1. There is as yet insufficient support for v3.1.1 to rely on, and current v3.1 implementations do not return the correct CONNACK code to allow detection of the fault. Closes #451735. +- Fix incorrect handling of queued messages after reconnecting. Closes + #452672. v1.0.2 - 2014-09-13 diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index ffb5022..4f86f91 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -1090,7 +1090,7 @@ class Client(object): if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN - max_packets = len(self._out_messages) + len(self._in_messages) + max_packets = len(self._out_packet) + 1 if max_packets < 1: max_packets = 1 @@ -2015,11 +2015,14 @@ class Client(object): for m in self._out_messages: m.timestamp = time.time() if m.state == mqtt_ms_queued: + self.loop_write() # Process outgoing messages that have just been queued up self._out_message_mutex.release() return MQTT_ERR_SUCCESS if m.qos == 0: + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc @@ -2027,7 +2030,9 @@ class Client(object): if m.state == mqtt_ms_publish: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_puback + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc @@ -2035,17 +2040,22 @@ class Client(object): if m.state == mqtt_ms_publish: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_pubrec + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc elif m.state == mqtt_ms_resend_pubrel: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_pubcomp + self._in_callback = True # Don't call loop_write after _send_pubrel() rc = self._send_pubrel(m.mid, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc + self.loop_write() # Process outgoing messages that have just been queued up self._out_message_mutex.release() return rc elif result > 0 and result < 6: -- 2.39.5