From 4376b3763c63f7ac7d31defa3fbc4ed8947b4be9 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Wed, 5 Feb 2014 20:15:01 +0000 Subject: [PATCH] Break out of select() on publish()/subscribe() etc. When a call to publish()/subscribe()/unsubscribe()/disconnect() is made and the client is using the threaded mode, the outgoing command will be delayed by 0.5 seconds on average. This commit breaks out of select() to remove the delay. Change-Id: I7a669604c767abb0bcc399bdf1fe3b82c7ef9539 --- src/paho/mqtt/client.py | 46 ++++++++++++++++++- test/lib/python/08-ssl-connect-cert-auth.test | 1 - .../lib/python3/08-ssl-connect-cert-auth.test | 1 - 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index c04ddb8..f3f8858 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -119,6 +119,10 @@ MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_UNKNOWN = 13 MQTT_ERR_ERRNO = 14 +if sys.version_info[0] < 3: + sockpair_data = "0" +else: + sockpair_data = b"0" def error_string(mqtt_errno): """Return the error string associated with an mqtt error number.""" @@ -235,6 +239,28 @@ def topic_matches_sub(sub, topic): return result +def _socketpair_compat(): + """TCP/IP socketpair including Windows support""" + listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) + listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listensock.bind(("localhost", 0)) + listensock.listen(1) + + iface, port = listensock.getsockname() + sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) + sock1.setblocking(0) + try: + sock1.connect(("localhost", port)) + except socket.error as err: + (msg) = err + if msg.errno != errno.EINPROGRESS: + raise + sock2, address = listensock.accept() + sock2.setblocking(0) + listensock.close() + return (sock1, sock2) + + class MQTTMessage: """ This is a class that describes an incoming message. It is passed to the on_message callback as the message parameter. @@ -365,6 +391,7 @@ class Client(object): self._userdata = userdata self._sock = None + self._sockpairR, self._sockpairW = _socketpair_compat() self._keepalive = 60 self._message_retry = 20 self._last_retry_check = 0 @@ -441,6 +468,13 @@ class Client(object): elif self._sock: self._sock.close() self._sock = None + if self._sockpairR: + self._sockpairR.close() + self._sockpairR = None + if self._sockpairW: + self._sockpairW.close() + self._sockpairW = None + self.__init__(client_id, clean_session, userdata) def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None): @@ -728,7 +762,9 @@ class Client(object): self._out_packet_mutex.release() self._current_out_packet_mutex.release() - rlist = [self.socket()] + # sockpairR is used to break out of select() before the timeout, on a + # call to publish() etc. + rlist = [self.socket(), self._sockpairR] try: socklist = select.select(rlist, wlist, [], timeout) except TypeError: @@ -740,10 +776,12 @@ class Client(object): if rc or (self._ssl is None and self._sock is None): return rc - if self.socket() in socklist[1]: + if self.socket() in socklist[1] or self._sockpairR in socklist[0]: rc = self.loop_write(max_packets) if rc or (self._ssl is None and self._sock is None): return rc + # Clear sockpairR - only ever a single byte written. + self._sockpairR.recv(1) return self.loop_misc() @@ -1739,6 +1777,10 @@ class Client(object): self._current_out_packet_mutex.release() self._out_packet_mutex.release() + # Write a single byte to sockpairW (connected to sockpairR) to break + # out of select() if in threaded mode. + self._sockpairW.send(sockpair_data) + if not self._in_callback and self._thread is None: return self.loop_write() else: diff --git a/test/lib/python/08-ssl-connect-cert-auth.test b/test/lib/python/08-ssl-connect-cert-auth.test index 8493c5a..f4b9b3f 100755 --- a/test/lib/python/08-ssl-connect-cert-auth.test +++ b/test/lib/python/08-ssl-connect-cert-auth.test @@ -20,7 +20,6 @@ def on_connect(mqttc, obj, rc): mqttc.disconnect() def on_disconnect(mqttc, obj, rc): - mqttc.loop() obj = rc diff --git a/test/lib/python3/08-ssl-connect-cert-auth.test b/test/lib/python3/08-ssl-connect-cert-auth.test index acb1dcd..84ccf02 100755 --- a/test/lib/python3/08-ssl-connect-cert-auth.test +++ b/test/lib/python3/08-ssl-connect-cert-auth.test @@ -17,7 +17,6 @@ def on_connect(mqttc, obj, rc): mqttc.disconnect() def on_disconnect(mqttc, obj, rc): - mqttc.loop() obj = rc -- 2.39.5