]> git.michaelhowe.org Git - packages/p/paho-mqtt.git/commitdiff
Break out of select() on publish()/subscribe() etc.
authorRoger Light <roger@atchoo.org>
Wed, 5 Feb 2014 20:15:01 +0000 (20:15 +0000)
committerRoger Light <roger@atchoo.org>
Wed, 5 Feb 2014 20:15:01 +0000 (20:15 +0000)
When a call to publish()/subscribe()/unsubscribe()/disconnect() is made
and the client is using the threaded mode, the outgoing command will be
delayed by 0.5 seconds on average. This commit breaks out of select() to
remove the delay.

Change-Id: I7a669604c767abb0bcc399bdf1fe3b82c7ef9539

src/paho/mqtt/client.py
test/lib/python/08-ssl-connect-cert-auth.test
test/lib/python3/08-ssl-connect-cert-auth.test

index c04ddb83d52358227f99d1e171ed5624f4821553..f3f8858c537d5b445860d8dcf2eb35c3783ae34f 100755 (executable)
@@ -119,6 +119,10 @@ MQTT_ERR_ACL_DENIED = 12
 MQTT_ERR_UNKNOWN = 13
 MQTT_ERR_ERRNO = 14
 
+if sys.version_info[0] < 3:
+    sockpair_data = "0"
+else:
+    sockpair_data = b"0"
 
 def error_string(mqtt_errno):
     """Return the error string associated with an mqtt error number."""
@@ -235,6 +239,28 @@ def topic_matches_sub(sub, topic):
     return result
 
 
+def _socketpair_compat():
+    """TCP/IP socketpair including Windows support"""
+    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
+    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    listensock.bind(("localhost", 0))
+    listensock.listen(1)
+
+    iface, port = listensock.getsockname()
+    sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
+    sock1.setblocking(0)
+    try:
+        sock1.connect(("localhost", port))
+    except socket.error as err:
+        (msg) = err
+        if msg.errno != errno.EINPROGRESS:
+            raise
+    sock2, address = listensock.accept()
+    sock2.setblocking(0)
+    listensock.close()
+    return (sock1, sock2)
+
+
 class MQTTMessage:
     """ This is a class that describes an incoming message. It is passed to the
     on_message callback as the message parameter.
@@ -365,6 +391,7 @@ class Client(object):
 
         self._userdata = userdata
         self._sock = None
+        self._sockpairR, self._sockpairW = _socketpair_compat()
         self._keepalive = 60
         self._message_retry = 20
         self._last_retry_check = 0
@@ -441,6 +468,13 @@ class Client(object):
         elif self._sock:
             self._sock.close()
             self._sock = None
+        if self._sockpairR:
+            self._sockpairR.close()
+            self._sockpairR = None
+        if self._sockpairW:
+            self._sockpairW.close()
+            self._sockpairW = None
+
         self.__init__(client_id, clean_session, userdata)
 
     def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None):
@@ -728,7 +762,9 @@ class Client(object):
         self._out_packet_mutex.release()
         self._current_out_packet_mutex.release()
 
-        rlist = [self.socket()]
+        # sockpairR is used to break out of select() before the timeout, on a
+        # call to publish() etc.
+        rlist = [self.socket(), self._sockpairR]
         try:
             socklist = select.select(rlist, wlist, [], timeout)
         except TypeError:
@@ -740,10 +776,12 @@ class Client(object):
             if rc or (self._ssl is None and self._sock is None):
                 return rc
 
-        if self.socket() in socklist[1]:
+        if self.socket() in socklist[1] or self._sockpairR in socklist[0]:
             rc = self.loop_write(max_packets)
             if rc or (self._ssl is None and self._sock is None):
                 return rc
+            # Clear sockpairR - only ever a single byte written.
+            self._sockpairR.recv(1)
 
         return self.loop_misc()
 
@@ -1739,6 +1777,10 @@ class Client(object):
             self._current_out_packet_mutex.release()
         self._out_packet_mutex.release()
 
+        # Write a single byte to sockpairW (connected to sockpairR) to break
+        # out of select() if in threaded mode.
+        self._sockpairW.send(sockpair_data)
+
         if not self._in_callback and self._thread is None:
             return self.loop_write()
         else:
index 8493c5adbec5255b0f3d11f5281114e49ebffc7e..f4b9b3fea6915a75dc2873c2f454b5e477025f05 100755 (executable)
@@ -20,7 +20,6 @@ def on_connect(mqttc, obj, rc):
         mqttc.disconnect()
 
 def on_disconnect(mqttc, obj, rc):
-    mqttc.loop()
     obj = rc
 
 
index acb1dcd292535b4f126f6b84f3bfa331c659139a..84ccf026bb5c8f53c6176fc300da6086f435bc64 100755 (executable)
@@ -17,7 +17,6 @@ def on_connect(mqttc, obj, rc):
         mqttc.disconnect()
 
 def on_disconnect(mqttc, obj, rc):
-    mqttc.loop()
     obj = rc