From: Roger A. Light Date: Wed, 29 Jan 2014 22:19:56 +0000 (+0000) Subject: [452672] Fix message handling after reconnecting. X-Git-Tag: v1.1^2~1 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=5b8ab0b3fc21ddbf31639513daa8de3b0a216661;p=packages%2Fp%2Fpaho-mqtt.git [452672] Fix message handling after reconnecting. Thanks to William Boerendans. Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=452672 Change-Id: I96e4bb0b5fa7e040605ff38db40550bcc68d20e9 --- diff --git a/ChangeLog.txt b/ChangeLog.txt index b805fb4..2af8bb3 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -6,6 +6,8 @@ v1.1 - 2015-01-30 v3.1.1. There is as yet insufficient support for v3.1.1 to rely on, and current v3.1 implementations do not return the correct CONNACK code to allow detection of the fault. Closes #451735. +- Fix incorrect handling of queued messages after reconnecting. Closes + #452672. v1.0.2 - 2014-09-13 diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index ffb5022..4f86f91 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -1090,7 +1090,7 @@ class Client(object): if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN - max_packets = len(self._out_messages) + len(self._in_messages) + max_packets = len(self._out_packet) + 1 if max_packets < 1: max_packets = 1 @@ -2015,11 +2015,14 @@ class Client(object): for m in self._out_messages: m.timestamp = time.time() if m.state == mqtt_ms_queued: + self.loop_write() # Process outgoing messages that have just been queued up self._out_message_mutex.release() return MQTT_ERR_SUCCESS if m.qos == 0: + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc @@ -2027,7 +2030,9 @@ class Client(object): if m.state == mqtt_ms_publish: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_puback + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc @@ -2035,17 +2040,22 @@ class Client(object): if m.state == mqtt_ms_publish: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_pubrec + self._in_callback = True # Don't call loop_write after _send_publish() rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc elif m.state == mqtt_ms_resend_pubrel: self._inflight_messages = self._inflight_messages + 1 m.state = mqtt_ms_wait_for_pubcomp + self._in_callback = True # Don't call loop_write after _send_pubrel() rc = self._send_pubrel(m.mid, m.dup) + self._in_callback = False if rc != 0: self._out_message_mutex.release() return rc + self.loop_write() # Process outgoing messages that have just been queued up self._out_message_mutex.release() return rc elif result > 0 and result < 6: