MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
+if sys.version_info[0] < 3:
+ sockpair_data = "0"
+else:
+ sockpair_data = b"0"
def error_string(mqtt_errno):
"""Return the error string associated with an mqtt error number."""
return result
+def _socketpair_compat():
+ """TCP/IP socketpair including Windows support"""
+ listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
+ listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ listensock.bind(("localhost", 0))
+ listensock.listen(1)
+
+ iface, port = listensock.getsockname()
+ sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
+ sock1.setblocking(0)
+ try:
+ sock1.connect(("localhost", port))
+ except socket.error as err:
+ (msg) = err
+ if msg.errno != errno.EINPROGRESS:
+ raise
+ sock2, address = listensock.accept()
+ sock2.setblocking(0)
+ listensock.close()
+ return (sock1, sock2)
+
+
class MQTTMessage:
""" This is a class that describes an incoming message. It is passed to the
on_message callback as the message parameter.
self._userdata = userdata
self._sock = None
+ self._sockpairR, self._sockpairW = _socketpair_compat()
self._keepalive = 60
self._message_retry = 20
self._last_retry_check = 0
elif self._sock:
self._sock.close()
self._sock = None
+ if self._sockpairR:
+ self._sockpairR.close()
+ self._sockpairR = None
+ if self._sockpairW:
+ self._sockpairW.close()
+ self._sockpairW = None
+
self.__init__(client_id, clean_session, userdata)
def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None):
self._out_packet_mutex.release()
self._current_out_packet_mutex.release()
- rlist = [self.socket()]
+ # sockpairR is used to break out of select() before the timeout, on a
+ # call to publish() etc.
+ rlist = [self.socket(), self._sockpairR]
try:
socklist = select.select(rlist, wlist, [], timeout)
except TypeError:
if rc or (self._ssl is None and self._sock is None):
return rc
- if self.socket() in socklist[1]:
+ if self.socket() in socklist[1] or self._sockpairR in socklist[0]:
rc = self.loop_write(max_packets)
if rc or (self._ssl is None and self._sock is None):
return rc
+ # Clear sockpairR - only ever a single byte written.
+ self._sockpairR.recv(1)
return self.loop_misc()
self._current_out_packet_mutex.release()
self._out_packet_mutex.release()
+ # Write a single byte to sockpairW (connected to sockpairR) to break
+ # out of select() if in threaded mode.
+ self._sockpairW.send(sockpair_data)
+
if not self._in_callback and self._thread is None:
return self.loop_write()
else: