From: Roger A. Light Date: Fri, 12 Sep 2014 20:13:32 +0000 (+0100) Subject: [443935] Fix reconnecting with lots of inflight messages. X-Git-Tag: v1.1~9 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=2b32ef45c48bc3f332c6e1030a513aa5db373f9e;p=packages%2Fp%2Fpaho-mqtt.git [443935] Fix reconnecting with lots of inflight messages. Fix reconnecting after sending more QoS>0 messages than inflight messages is set to, whilst connecting. Closes #443935. Thanks to Hiram van Paassen. Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=443935 Change-Id: I31ddf2ffcbd6a7efa1e5b51c163d112ce3cf61ae --- diff --git a/ChangeLog.txt b/ChangeLog.txt index b34b91b..5bec8cc 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -6,6 +6,8 @@ v1.0.2 Closes #439277. - Don't attempt to encode topic to utf-8 twice. Thanks to Luc Milland. - Handle "unicode" type payloads on Python 2.7. Thanks to Luc Milland. +- Fix reconnecting after sending more QoS>0 messages than inflight messages is + set to, whilst connecting. Closes #443935. Thanks to Hiram van Paassen. v1.0.1 ====== diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index a28c81f..6b6afed 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -101,10 +101,17 @@ mqtt_cs_connect_async = 3 # Message state mqtt_ms_invalid = 0, -mqtt_ms_wait_puback = 1 -mqtt_ms_wait_pubrec = 2 -mqtt_ms_wait_pubrel = 3 -mqtt_ms_wait_pubcomp = 4 +mqtt_ms_publish_qos0 = 1 +mqtt_ms_publish_qos1 = 2 +mqtt_ms_wait_for_puback = 3 +mqtt_ms_publish_qos2 = 4 +mqtt_ms_wait_for_pubrec = 5 +mqtt_ms_resend_pubrel = 6 +mqtt_ms_wait_for_pubrel = 7 +mqtt_ms_resend_pubcomp = 8 +mqtt_ms_wait_for_pubcomp = 9 +mqtt_ms_send_pubrec = 10 +mqtt_ms_queued = 11 # Error values MQTT_ERR_AGAIN = -1 @@ -882,15 +889,17 @@ class Client(object): if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: self._inflight_messages = self._inflight_messages+1 if qos == 1: - message.state = mqtt_ms_wait_puback + message.state = mqtt_ms_wait_for_puback elif qos == 2: - message.state = mqtt_ms_wait_pubrec + message.state = mqtt_ms_wait_for_pubrec self._out_message_mutex.release() rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) return (rc, local_mid) - self._out_message_mutex.release() - return (MQTT_ERR_SUCCESS, local_mid) + else: + message.state = mqtt_ms_queued; + self._out_message_mutex.release() + return (MQTT_ERR_SUCCESS, local_mid) def username_pw_set(self, username, password=None): """Set a username and optionally a password for broker authentication. @@ -1792,15 +1801,15 @@ class Client(object): now = time.time() for m in messages: if m.timestamp + self._message_retry < now: - if m.state == mqtt_ms_wait_puback or m.state == mqtt_ms_wait_pubrec: + if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: m.timestamp = now m.dup = True self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) - elif m.state == mqtt_ms_wait_pubrel: + elif m.state == mqtt_ms_wait_for_pubrel: m.timestamp = now m.dup = True self._send_pubrec(m.mid) - elif m.state == mqtt_ms_wait_pubcomp: + elif m.state == mqtt_ms_wait_for_pubcomp: m.timestamp = now m.dup = True self._send_pubrel(m.mid, True) @@ -1812,16 +1821,28 @@ class Client(object): def _messages_reconnect_reset_out(self): self._out_message_mutex.acquire() + self._inflight_messages = 0 for m in self._out_messages: m.timestamp = 0 if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: - if m.qos == 1: - m.state = mqtt_ms_wait_puback + if m.qos == 0: + m.state = mqtt_ms_publish_qos0 + elif m.qos == 1: + #self._inflight_messages = self._inflight_messages + 1 + if m.state == mqtt_ms_wait_for_puback: + m.dup = True + m.state = mqtt_ms_publish_qos1 elif m.qos == 2: - # Preserve current state - pass + #self._inflight_messages = self._inflight_messages + 1 + if m.state == mqtt_ms_wait_for_pubcomp: + m.state = mqtt_ms_resend_pubrel + m.dup = True + else: + if m.state == mqtt_ms_wait_for_pubrec: + m.dup = True + m.state = mqtt_ms_publish_qos2 else: - m.state = mqtt_ms_invalid + m.state = mqtt_ms_queued self._out_message_mutex.release() def _messages_reconnect_reset_in(self): @@ -1951,7 +1972,44 @@ class Client(object): self._in_callback = False self._callback_mutex.release() if result == 0: - return MQTT_ERR_SUCCESS + rc = 0 + self._out_message_mutex.acquire() + for m in self._out_messages: + m.timestamp = time.time() + if m.state == mqtt_ms_queued: + self._out_message_mutex.release() + return MQTT_ERR_SUCCESS + + if m.qos == 0: + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.qos == 1: + if m.state == mqtt_ms_publish_qos1: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_puback + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.qos == 2: + if m.state == mqtt_ms_publish_qos2: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_pubrec + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.state == mqtt_ms_resend_pubrel: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_pubcomp + rc = self._send_pubrel(m.mid, m.dup) + if rc != 0: + self._out_message_mutex.release() + return rc + self._out_message_mutex.release() + return rc elif result > 0 and result < 6: return MQTT_ERR_CONN_REFUSED else: @@ -2016,7 +2074,7 @@ class Client(object): return rc elif message.qos == 2: rc = self._send_pubrec(message.mid) - message.state = mqtt_ms_wait_pubrel + message.state = mqtt_ms_wait_for_pubrel self._in_message_mutex.acquire() self._in_messages.append(message) self._in_message_mutex.release() @@ -2063,7 +2121,7 @@ class Client(object): # Dont lock message_mutex here for m in self._out_messages: if self._inflight_messages < self._max_inflight_messages: - if m.qos > 0 and m.state == mqtt_ms_invalid: + if m.qos > 0 and m.state == mqtt_ms_queued: self._inflight_messages = self._inflight_messages + 1 if m.qos == 1: m.state = mqtt_ms_wait_puback @@ -2088,7 +2146,7 @@ class Client(object): self._out_message_mutex.acquire() for m in self._out_messages: if m.mid == mid: - m.state = mqtt_ms_wait_pubcomp + m.state = mqtt_ms_wait_for_pubcomp m.timestamp = time.time() self._out_message_mutex.release() return self._send_pubrel(mid, False)