]> git.michaelhowe.org Git - packages/p/paho-mqtt.git/commitdiff
[443935] Fix reconnecting with lots of inflight messages.
authorRoger A. Light <roger@atchoo.org>
Fri, 12 Sep 2014 20:13:32 +0000 (21:13 +0100)
committerRoger A. Light <roger@atchoo.org>
Fri, 12 Sep 2014 20:13:32 +0000 (21:13 +0100)
Fix reconnecting after sending more QoS>0 messages than inflight messages is
set to, whilst connecting.  Closes #443935. Thanks to Hiram van Paassen.

Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=443935
Change-Id: I31ddf2ffcbd6a7efa1e5b51c163d112ce3cf61ae

ChangeLog.txt
src/paho/mqtt/client.py

index b34b91bb62060903af87b5669ed270512f4254f8..5bec8cc979d7dec496c5fcb73ce40b4410b1df34 100644 (file)
@@ -6,6 +6,8 @@ v1.0.2
   Closes #439277.
 - Don't attempt to encode topic to utf-8 twice. Thanks to Luc Milland.
 - Handle "unicode" type payloads on Python 2.7. Thanks to Luc Milland.
+- Fix reconnecting after sending more QoS>0 messages than inflight messages is
+  set to, whilst connecting.  Closes #443935. Thanks to Hiram van Paassen.
 
 v1.0.1
 ======
index a28c81f971b47ac5807ba9d15befb7d9ea8a898a..6b6afedc4a7d04947c8ce14d76859b26451c8505 100755 (executable)
@@ -101,10 +101,17 @@ mqtt_cs_connect_async = 3
 
 # Message state
 mqtt_ms_invalid = 0,
-mqtt_ms_wait_puback = 1
-mqtt_ms_wait_pubrec = 2
-mqtt_ms_wait_pubrel = 3
-mqtt_ms_wait_pubcomp = 4
+mqtt_ms_publish_qos0 = 1
+mqtt_ms_publish_qos1 = 2
+mqtt_ms_wait_for_puback = 3
+mqtt_ms_publish_qos2 = 4
+mqtt_ms_wait_for_pubrec = 5
+mqtt_ms_resend_pubrel = 6
+mqtt_ms_wait_for_pubrel = 7
+mqtt_ms_resend_pubcomp = 8
+mqtt_ms_wait_for_pubcomp = 9
+mqtt_ms_send_pubrec = 10
+mqtt_ms_queued = 11
 
 # Error values
 MQTT_ERR_AGAIN = -1
@@ -882,15 +889,17 @@ class Client(object):
             if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
                 self._inflight_messages = self._inflight_messages+1
                 if qos == 1:
-                    message.state = mqtt_ms_wait_puback
+                    message.state = mqtt_ms_wait_for_puback
                 elif qos == 2:
-                    message.state = mqtt_ms_wait_pubrec
+                    message.state = mqtt_ms_wait_for_pubrec
                 self._out_message_mutex.release()
 
                 rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
                 return (rc, local_mid)
-            self._out_message_mutex.release()
-            return (MQTT_ERR_SUCCESS, local_mid)
+            else:
+                message.state = mqtt_ms_queued;
+                self._out_message_mutex.release()
+                return (MQTT_ERR_SUCCESS, local_mid)
 
     def username_pw_set(self, username, password=None):
         """Set a username and optionally a password for broker authentication.
@@ -1792,15 +1801,15 @@ class Client(object):
         now = time.time()
         for m in messages:
             if m.timestamp + self._message_retry < now:
-                if m.state == mqtt_ms_wait_puback or m.state == mqtt_ms_wait_pubrec:
+                if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
                     m.timestamp = now
                     m.dup = True
                     self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
-                elif m.state == mqtt_ms_wait_pubrel:
+                elif m.state == mqtt_ms_wait_for_pubrel:
                     m.timestamp = now
                     m.dup = True
                     self._send_pubrec(m.mid)
-                elif m.state == mqtt_ms_wait_pubcomp:
+                elif m.state == mqtt_ms_wait_for_pubcomp:
                     m.timestamp = now
                     m.dup = True
                     self._send_pubrel(m.mid, True)
@@ -1812,16 +1821,28 @@ class Client(object):
 
     def _messages_reconnect_reset_out(self):
         self._out_message_mutex.acquire()
+        self._inflight_messages = 0
         for m in self._out_messages:
             m.timestamp = 0
             if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
-                if m.qos == 1:
-                    m.state = mqtt_ms_wait_puback
+                if m.qos == 0:
+                    m.state = mqtt_ms_publish_qos0
+                elif m.qos == 1:
+                    #self._inflight_messages = self._inflight_messages + 1
+                    if m.state == mqtt_ms_wait_for_puback:
+                        m.dup = True
+                    m.state = mqtt_ms_publish_qos1
                 elif m.qos == 2:
-                    # Preserve current state
-                    pass
+                    #self._inflight_messages = self._inflight_messages + 1
+                    if m.state == mqtt_ms_wait_for_pubcomp:
+                        m.state = mqtt_ms_resend_pubrel
+                        m.dup = True
+                    else:
+                        if m.state == mqtt_ms_wait_for_pubrec:
+                            m.dup = True
+                        m.state = mqtt_ms_publish_qos2
             else:
-                m.state = mqtt_ms_invalid
+                m.state = mqtt_ms_queued
         self._out_message_mutex.release()
 
     def _messages_reconnect_reset_in(self):
@@ -1951,7 +1972,44 @@ class Client(object):
             self._in_callback = False
         self._callback_mutex.release()
         if result == 0:
-            return MQTT_ERR_SUCCESS
+            rc = 0
+            self._out_message_mutex.acquire()
+            for m in self._out_messages:
+                m.timestamp = time.time()
+                if m.state == mqtt_ms_queued:
+                    self._out_message_mutex.release()
+                    return MQTT_ERR_SUCCESS
+
+                if m.qos == 0:
+                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+                    if rc != 0:
+                        self._out_message_mutex.release()
+                        return rc
+                elif m.qos == 1:
+                    if m.state == mqtt_ms_publish_qos1:
+                        self._inflight_messages = self._inflight_messages + 1
+                        m.state = mqtt_ms_wait_for_puback
+                        rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+                        if rc != 0:
+                            self._out_message_mutex.release()
+                            return rc
+                elif m.qos == 2:
+                    if m.state == mqtt_ms_publish_qos2:
+                        self._inflight_messages = self._inflight_messages + 1
+                        m.state = mqtt_ms_wait_for_pubrec
+                        rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+                        if rc != 0:
+                            self._out_message_mutex.release()
+                            return rc
+                    elif m.state == mqtt_ms_resend_pubrel:
+                        self._inflight_messages = self._inflight_messages + 1
+                        m.state = mqtt_ms_wait_for_pubcomp
+                        rc = self._send_pubrel(m.mid, m.dup)
+                        if rc != 0:
+                            self._out_message_mutex.release()
+                            return rc
+            self._out_message_mutex.release()
+            return rc
         elif result > 0 and result < 6:
             return MQTT_ERR_CONN_REFUSED
         else:
@@ -2016,7 +2074,7 @@ class Client(object):
             return rc
         elif message.qos == 2:
             rc = self._send_pubrec(message.mid)
-            message.state = mqtt_ms_wait_pubrel
+            message.state = mqtt_ms_wait_for_pubrel
             self._in_message_mutex.acquire()
             self._in_messages.append(message)
             self._in_message_mutex.release()
@@ -2063,7 +2121,7 @@ class Client(object):
         # Dont lock message_mutex here
         for m in self._out_messages:
             if self._inflight_messages < self._max_inflight_messages:
-                if m.qos > 0 and m.state == mqtt_ms_invalid:
+                if m.qos > 0 and m.state == mqtt_ms_queued:
                     self._inflight_messages = self._inflight_messages + 1
                     if m.qos == 1:
                         m.state = mqtt_ms_wait_puback
@@ -2088,7 +2146,7 @@ class Client(object):
         self._out_message_mutex.acquire()
         for m in self._out_messages:
             if m.mid == mid:
-                m.state = mqtt_ms_wait_pubcomp
+                m.state = mqtt_ms_wait_for_pubcomp
                 m.timestamp = time.time()
                 self._out_message_mutex.release()
                 return self._send_pubrel(mid, False)