From: Roger Light Date: Sat, 15 Jun 2013 21:20:20 +0000 (+0100) Subject: Start of reorganisation for Paho. X-Git-Tag: v0.9~80 X-Git-Url: https://git.michaelhowe.org/gitweb/?a=commitdiff_plain;h=806b36f2b9641d4821300a6628f51ddb29f4aa17;p=packages%2Fp%2Fpaho-mqtt.git Start of reorganisation for Paho. --HG-- rename : lib/python/sub.py => examples/sub.py rename : lib/python/setup.py => setup.py rename : lib/python/mosquitto.py => src/paho/mqtt/mosquitto.py --- diff --git a/examples/sub.py b/examples/sub.py new file mode 100755 index 0000000..7af946d --- /dev/null +++ b/examples/sub.py @@ -0,0 +1,62 @@ +#!/usr/bin/python + +# Copyright (c) 2010,2011 Roger Light +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of mosquitto nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import mosquitto + +def on_connect(mosq, obj, rc): + print("rc: "+str(rc)) + +def on_message(mosq, obj, msg): + print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +def on_publish(mosq, obj, mid): + print("mid: "+str(mid)) + +def on_subscribe(mosq, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos)) + +def on_log(mosq, obj, level, string): + print(string) + +mqttc = mosquitto.Mosquitto() +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +# Uncomment to enable debug messages +#mqttc.on_log = on_log +mqttc.connect("test.mosquitto.org", 1883, 60) +mqttc.subscribe("$SYS/#", 0) + + +rc = 0 +while rc == 0: + rc = mqttc.loop() + +print("rc: "+str(rc)) diff --git a/lib/python/mosquitto.py b/lib/python/mosquitto.py deleted file mode 100755 index 86f6ce9..0000000 --- a/lib/python/mosquitto.py +++ /dev/null @@ -1,1781 +0,0 @@ -# Copyright (c) 2012 Roger Light -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of mosquitto nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# -# This product includes software developed by the OpenSSL Project for use in -# the OpenSSL Toolkit. (http://www.openssl.org/) -# This product includes cryptographic software written by Eric Young -# (eay@cryptsoft.com) -# This product includes software written by Tim Hudson (tjh@cryptsoft.com) - -""" -This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging -protocol that is easy to implement and suitable for low powered devices. -""" -import errno -import random -import select -import socket -import ssl -import struct -import sys -import threading -import time - -if sys.version_info[0] < 3: - PROTOCOL_NAME = "MQIsdp" -else: - PROTOCOL_NAME = b"MQIsdp" - -PROTOCOL_VERSION = 3 - -# Message types -CONNECT = 0x10 -CONNACK = 0x20 -PUBLISH = 0x30 -PUBACK = 0x40 -PUBREC = 0x50 -PUBREL = 0x60 -PUBCOMP = 0x70 -SUBSCRIBE = 0x80 -SUBACK = 0x90 -UNSUBSCRIBE = 0xA0 -UNSUBACK = 0xB0 -PINGREQ = 0xC0 -PINGRESP = 0xD0 -DISCONNECT = 0xE0 - -# Log levels -MOSQ_LOG_INFO = 0x01 -MOSQ_LOG_NOTICE = 0x02 -MOSQ_LOG_WARNING = 0x04 -MOSQ_LOG_ERR = 0x08 -MOSQ_LOG_DEBUG = 0x10 - -# CONNACK codes -CONNACK_ACCEPTED = 0 -CONNACK_REFUSED_PROTOCOL_VERSION = 1 -CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 -CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 -CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 -CONNACK_REFUSED_NOT_AUTHORIZED = 5 - -# Connection state -mosq_cs_new = 0 -mosq_cs_connected = 1 -mosq_cs_disconnecting = 2 -mosq_cs_connect_async = 3 - -# Message direction -mosq_md_invalid = 0 -mosq_md_in = 1 -mosq_md_out = 2 - -# Message state -mosq_ms_invalid = 0, -mosq_ms_wait_puback = 1 -mosq_ms_wait_pubrec = 2 -mosq_ms_wait_pubrel = 3 -mosq_ms_wait_pubcomp = 4 - -# Error values -MOSQ_ERR_AGAIN = -1 -MOSQ_ERR_SUCCESS = 0 -MOSQ_ERR_NOMEM = 1 -MOSQ_ERR_PROTOCOL = 2 -MOSQ_ERR_INVAL = 3 -MOSQ_ERR_NO_CONN = 4 -MOSQ_ERR_CONN_REFUSED = 5 -MOSQ_ERR_NOT_FOUND = 6 -MOSQ_ERR_CONN_LOST = 7 -MOSQ_ERR_TLS = 8 -MOSQ_ERR_PAYLOAD_SIZE = 9 -MOSQ_ERR_NOT_SUPPORTED = 10 -MOSQ_ERR_AUTH = 11 -MOSQ_ERR_ACL_DENIED = 12 -MOSQ_ERR_UNKNOWN = 13 -MOSQ_ERR_ERRNO = 14 - -def _fix_sub_topic(subtopic): - # Convert ////some////over/slashed///topic/etc/etc// - # into some/over/slashed/topic/etc/etc - if subtopic[0] == '/': - return '/'+'/'.join(filter(None, subtopic.split('/'))) - else: - return '/'.join(filter(None, subtopic.split('/'))) - -def error_string(mosq_errno): - """Return the error string associated with a mosquitto error number.""" - if mosq_errno == MOSQ_ERR_SUCCESS: - return "No error." - elif mosq_errno == MOSQ_ERR_NOMEM: - return "Out of memory." - elif mosq_errno == MOSQ_ERR_PROTOCOL: - return "A network protocol error occurred when communicating with the broker." - elif mosq_errno == MOSQ_ERR_INVAL: - return "Invalid function arguments provided." - elif mosq_errno == MOSQ_ERR_NO_CONN: - return "The client is not currently connected." - elif mosq_errno == MOSQ_ERR_CONN_REFUSED: - return "The connection was refused." - elif mosq_errno == MOSQ_ERR_NOT_FOUND: - return "Message not found (internal error)." - elif mosq_errno == MOSQ_ERR_CONN_LOST: - return "The connection was lost." - elif mosq_errno == MOSQ_ERR_TLS: - return "A TLS error occurred." - elif mosq_errno == MOSQ_ERR_PAYLOAD_SIZE: - return "Payload too large." - elif mosq_errno == MOSQ_ERR_NOT_SUPPORTED: - return "This feature is not supported." - elif mosq_errno == MOSQ_ERR_AUTH: - return "Authorisation failed." - elif mosq_errno == MOSQ_ERR_ACL_DENIED: - return "Access denied by ACL." - elif mosq_errno == MOSQ_ERR_UNKNOWN: - return "Unknown error." - elif mosq_errno == MOSQ_ERR_ERRNO: - return "Error defined by errno." - else: - return "Unknown error." - -def connack_string(connack_code): - """Return the string associated with a CONNACK result.""" - if connack_code == 0: - return "Connection Accepted." - elif connack_code == 1: - return "Connection Refused: unacceptable protocol version." - elif connack_code == 2: - return "Connection Refused: identifier rejected." - elif connack_code == 3: - return "Connection Refused: broker unavailable." - elif connack_code == 4: - return "Connection Refused: bad user name or password." - elif connack_code == 5: - return "Connection Refused: not authorised." - else: - return "Connection Refused: unknown reason." - -def topic_matches_sub(sub, topic): - """Check whether a topic matches a subscription. - - For example: - - foo/bar would match the subscription foo/# or +/bar - non/matching would not match the subscription non/+/+ - """ - result = True - local_sub = _fix_sub_topic(sub) - local_topic = _fix_sub_topic(topic) - multilevel_wildcard = False - - slen = len(local_sub) - tlen = len(local_topic) - - spos = 0; - tpos = 0; - - while spos < slen and tpos < tlen: - if local_sub[spos] == local_topic[tpos]: - spos += 1 - tpos += 1 - else: - if local_sub[spos] == '+': - spos += 1 - while tpos < tlen and local_topic[tpos] != '/': - tpos += 1 - if tpos == tlen and spos == slen: - result = True - break - - elif local_sub[spos] == '#': - multilevel_wildcard = True - if spos+1 != slen: - result = False - break - else: - result = True - break - - else: - result = False - break - - if tpos == tlen-1: - # Check for e.g. foo matching foo/# - if spos == slen-3 and local_sub[spos+1] == '/' and local_sub[spos+2] == '#': - result = True - multilevel_wildcard = True - break - - if not multilevel_wildcard and (tpos < tlen or spos < slen): - result = False - - return result - - -class MosquittoMessage: - """ This is a class that describes an incoming message. It is passed to the - on_message callback as the message parameter. - - Members: - - topic : String. topic that the message was published on. - payload : String/bytes the message payload. - qos : Integer. The message Quality of Service 0, 1 or 2. - retain : Boolean. If true, the message is a retained message and not fresh. - mid : Integer. The message id. - """ - def __init__(self): - self.timestamp = 0 - self.direction = mosq_md_invalid - self.state = mosq_ms_invalid - self.dup = False - self.mid = 0 - self.topic = "" - self.payload = None - self.qos = 0 - self.retain = False - -class MosquittoInPacket: - """Internal datatype.""" - def __init__(self): - self.command = 0 - self.have_remaining = 0 - self.remaining_count = [] - self.remaining_mult = 1 - self.remaining_length = 0 - self.packet = b"" - self.to_process = 0 - self.pos = 0 - - def cleanup(self): - self.__init__() - -class MosquittoPacket: - """Internal datatype.""" - def __init__(self, command, packet, mid, qos): - self.command = command - self.mid = mid - self.qos = qos - self.pos = 0 - self.to_process = len(packet) - self.packet = packet - -class Mosquitto: - """MQTT version 3.1 client class. - - This is the main class for use communicating with an MQTT broker. - - General usage flow: - - * Use connect()/connect_async() to connect to a broker - * Call loop() frequently to maintain network traffic flow with the broker - * Or use loop_start() to set a thread running to call loop() for you. - * Or use loop_forever() to handle calling loop() for you in a blocking - * function. - * Use subscribe() to subscribe to a topic and receive messages - * Use publish() to send messages - * Use disconnect() to disconnect from the broker - - Data returned from the broker is made available with the use of callback - functions as described below. - - Callbacks - ========= - - A number of callback functions are available to receive data back from the - broker. To use a callback, define a function and then assign it to the - client: - - def on_connect(mosq, userdata, rc): - print("Connection returned " + str(rc)) - - client.on_connect = on_connect - - All of the callbacks as described below have a "mosq" and an "userdata" - argument. "mosq" is the Mosquitto instance that is calling the callback. - "userdata" is user data of any type and can be set when creating a new client - instance or with user_data_set(userdata). - - The callbacks: - - on_connect(mosq, userdata, rc): called when the broker responds to our connection - request. The value of rc determines success or not: - 0: Connection successful - 1: Connection refused - incorrect protocol version - 2: Connection refused - invalid client identifier - 3: Connection refused - server unavailable - 4: Connection refused - bad username or password - 5: Connection refused - not authorised - 6-255: Currently unused. - - on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker. - The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS - (0), the callback was called in response to a disconnect() call. If any - other value the disconnection was unexpected, such as might be caused by - a network error. - - on_message(mosq, userdata, message): called when a message has been received on a - topic that the client subscribes to. The message variable is a - MosquittoMessage that describes all of the message parameters. - - on_publish(mosq, userdata, mid): called when a message that was to be sent using the - publish() call has completed transmission to the broker. For messages - with QoS levels 1 and 2, this means that the appropriate handshakes have - completed. For QoS 0, this simply means that the message has left the - client. The mid variable matches the mid variable returned from the - corresponding publish() call, to allow outgoing messages to be tracked. - This callback is important because even if the publish() call returns - success, it does not always mean that the message has been sent. - - on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a - subscribe request. The mid variable matches the mid variable returned - from the corresponding subscribe() call. The granted_qos variable is a - list of integers that give the QoS level the broker has granted for each - of the different subscription requests. - - on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe - request. The mid variable matches the mid variable returned from the - corresponding unsubscribe() call. - - on_log(mosq, userdata, level, buf): called when the client has log information. Define - to allow debugging. The level variable gives the severity of the message - and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING, - MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf. - - """ - def __init__(self, client_id="", clean_session=True, userdata=None): - """client_id is the unique client id string used when connecting to the - broker. If client_id is zero length or None, then one will be randomly - generated. In this case, clean_session must be True. If this is not the - case a ValueError will be raised. - - clean_session is a boolean that determines the client type. If True, - the broker will remove all information about this client when it - disconnects. If False, the client is a persistent client and - subscription information and queued messages will be retained when the - client disconnects. - Note that a client will never discard its own outgoing messages on - disconnect. Calling connect() or reconnect() will cause the messages to - be resent. Use reinitialise() to reset a client to its original state. - - userdata is user defined data of any type that is passed as the "userdata" - parameter to callbacks. It may be updated at a later point with the - user_data_set() function. - """ - if not clean_session and (client_id == "" or client_id == None): - raise ValueError('A client id must be provided if clean session is False.') - - self._userdata = userdata - self._sock = None - self._keepalive = 60 - self._message_retry = 20 - self._last_retry_check = 0 - self._clean_session = clean_session - if client_id == "": - self._client_id = "mosq/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) - else: - self._client_id = client_id - - self._username = "" - self._password = "" - self._in_packet = MosquittoInPacket() - self._out_packet = [] - self._current_out_packet = None - self._last_msg_in = time.time() - self._last_msg_out = time.time() - self._ping_t = 0 - self._last_mid = 0 - self._state = mosq_cs_new - self._messages = [] - self._will = False - self._will_topic = "" - self._will_payload = None - self._will_qos = 0 - self._will_retain = False - self.on_disconnect = None - self.on_connect = None - self.on_publish = None - self.on_message = None - self.on_subscribe = None - self.on_unsubscribe = None - self.on_log = None - self._host = "" - self._port = 1883 - self._in_callback = False - self._strict_protocol = False - self._callback_mutex = threading.Lock() - self._state_mutex = threading.Lock() - self._out_packet_mutex = threading.Lock() - self._current_out_packet_mutex = threading.Lock() - self._msgtime_mutex = threading.Lock() - self._thread = None - self._thread_terminate = False - self._ssl = None - self._tls_certfile = None - self._tls_keyfile = None - self._tls_ca_certs = None - self._tls_cert_reqs = None - self._tls_ciphers = None - - def __del__(self): - pass - - def reinitialise(self, client_id="", clean_session=True, userdata=None): - if self._ssl: - self._ssl.close() - self._ssl = None - self._sock = None - elif self._sock: - self._sock.close() - self._sock = None - self.__init__(client_id, clean_session, userdata) - - def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None): - """Configure network encryption and authentication options. Enables SSL/TLS support. - - ca_certs : a string path to the Certificate Authority certificate files - that are to be treated as trusted by this client. If this is the only - option given then the client will operate in a similar manner to a web - browser. That is to say it will require the broker to have a - certificate signed by the Certificate Authorities in ca_certs and will - communicate using TLS v1, but will not attempt any form of - authentication. This provides basic network encryption but may not be - sufficient depending on how the broker is configured. - - certfile and keyfile are strings pointing to the PEM encoded client - certificate and private keys respectively. If these arguments are not - None then they will be used as client information for TLS based - authentication. Support for this feature is broker dependent. Note - that if either of these files in encrypted and needs a password to - decrypt it, Python will ask for the password at the command line. It is - not currently possible to define a callback to provide the password. - - cert_reqs allows the certificate requirements that the client imposes - on the broker to be changed. By default this is ssl.CERT_REQUIRED, - which means that the broker must provide a certificate. See the ssl - pydoc for more information on this parameter. - - tls_version allows the version of the SSL/TLS protocol used to be - specified. By default TLS v1 is used. Previous versions (all versions - beginning with SSL) are possible but not recommended due to possible - security problems. - - ciphers is a string specifying which encryption ciphers are allowable - for this connection, or None to use the defaults. See the ssl pydoc for - more information. - - Must be called before connect() or connect_async().""" - if sys.version < '2.7': - raise ValueError('Python 2.7 is the minimum supported version for TLS.') - - if ca_certs == None: - raise ValueError('ca_certs must not be None.') - - try: - f = open(ca_certs, "r") - except IOError as err: - raise IOError(ca_certs+": "+err.strerror) - else: - f.close() - if certfile != None: - try: - f = open(certfile, "r") - except IOError as err: - raise IOError(certfile+": "+err.strerror) - else: - f.close() - if keyfile != None: - try: - f = open(keyfile, "r") - except IOError as err: - raise IOError(keyfile+": "+err.strerror) - else: - f.close() - - self._tls_ca_certs = ca_certs - self._tls_certfile = certfile - self._tls_keyfile = keyfile - self._tls_cert_reqs = cert_reqs - self._tls_version = tls_version - self._tls_ciphers = ciphers - - def connect(self, host, port=1883, keepalive=60): - """Connect to a remote broker. - - host is the hostname or IP address of the remote broker. - port is the network port of the server host to connect to. Defaults to - 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you - are using tls_set() the port may need providing. - keepalive: Maximum period in seconds between communications with the - broker. If no other messages are being exchanged, this controls the - rate at which the client will send ping messages to the broker. - """ - self.connect_async(host, port, keepalive) - return self.reconnect() - - def connect_async(self, host, port=1883, keepalive=60): - """Connect to a remote broker asynchronously. This is a non-blocking - connect call that can be used with loop_start() to provide very quick - start. - - host is the hostname or IP address of the remote broker. - port is the network port of the server host to connect to. Defaults to - 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you - are using tls_set() the port may need providing. - keepalive: Maximum period in seconds between communications with the - broker. If no other messages are being exchanged, this controls the - rate at which the client will send ping messages to the broker. - """ - if host == None or len(host) == 0: - raise ValueError('Invalid host.') - if port <= 0: - raise ValueError('Invalid port number.') - if keepalive < 0: - raise ValueError('Keepalive must be >=0.') - - self._host = host - self._port = port - self._keepalive = keepalive - - self._state_mutex.acquire() - self._state = mosq_cs_connect_async - self._state_mutex.release() - - def reconnect(self): - """Reconnect the client after a disconnect. Can only be called after - connect()/connect_async().""" - if len(self._host) == 0: - raise ValueError('Invalid host.') - if self._port <= 0: - raise ValueError('Invalid port number.') - - self._in_packet.cleanup() - self._out_packet_mutex.acquire() - self._out_packet = [] - self._out_packet_mutex.release() - - self._current_out_packet_mutex.acquire() - self._current_out_packet = None - self._current_out_packet_mutex.release() - - self._msgtime_mutex.acquire() - self._last_msg_in = time.time() - self._last_msg_out = time.time() - self._msgtime_mutex.release() - - self._ping_t = 0 - self._state_mutex.acquire() - self._state = mosq_cs_new - self._state_mutex.release() - if self._ssl: - self._ssl.close() - self._ssl = None - self._sock = None - elif self._sock: - self._sock.close() - self._sock = None - - # Put messages in progress in a valid state. - self._messages_reconnect_reset() - - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # FIXME use create_connection here - - - if self._tls_ca_certs != None: - self._ssl = ssl.wrap_socket(self._sock, - certfile=self._tls_certfile, - keyfile=self._tls_keyfile, - ca_certs=self._tls_ca_certs, - cert_reqs=self._tls_cert_reqs, - ssl_version=self._tls_version, - ciphers=self._tls_ciphers) - - try: - self.socket().connect((self._host, self._port)) - except socket.error as err: - (msg) = err - if msg.errno != errno.EINPROGRESS: - print(msg) - return 1 - - self._sock.setblocking(0) - - return self._send_connect(self._keepalive, self._clean_session) - - def loop(self, timeout=1.0, max_packets=1): - """Process network events. - - This function must be called regularly to ensure communication with the - broker is carried out. It calls select() on the network socket to wait - for network events. If incoming data is present it will then be - processed. Outgoing commands, from e.g. publish(), are normally sent - immediately that their function is called, but this is not always - possible. loop() will also attempt to send any remaining outgoing - messages, which also includes commands that are part of the flow for - messages with QoS>0. - - timeout: The time in seconds to wait for incoming/outgoing network - traffic before timing out and returning. - max_packets: Not currently used. - - Returns MOSQ_ERR_SUCCESS on success. - Returns >0 on error. - - A ValueError will be raised if timeout < 0""" - if timeout < 0.0: - raise ValueError('Invalid timeout.') - - self._current_out_packet_mutex.acquire() - self._out_packet_mutex.acquire() - if self._current_out_packet == None and len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.pop(0) - - if self._current_out_packet: - wlist = [self.socket()] - else: - wlist = [] - self._out_packet_mutex.release() - self._current_out_packet_mutex.release() - - rlist = [self.socket()] - try: - socklist = select.select(rlist, wlist, [], timeout) - except TypeError: - # Socket isn't correct type, in likelihood connection is lost - return MOSQ_ERR_CONN_LOST - - if self.socket() in socklist[0]: - rc = self.loop_read(max_packets) - if rc or (self._ssl == None and self._sock == None): - return rc - - if self.socket() in socklist[1]: - rc = self.loop_write(max_packets) - if rc or (self._ssl == None and self._sock == None): - return rc - - return self.loop_misc() - - def publish(self, topic, payload=None, qos=0, retain=False): - """Publish a message on a topic. - - This causes a message to be sent to the broker and subsequently from - the broker to any clients subscribing to matching topics. - - topic: The topic that the message should be published on. - payload: The actual message to send. If not given, or set to None a - zero length message will be used. Passing an int or float will result - in the payload being converted to a string representing that number. If - you wish to send a true int/float, use struct.pack() to create the - payload you require. - qos: The quality of service level to use. - retain: If set to true, the message will be set as the "last known - good"/retained message for the topic. - - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS to - indicate success or MOSQ_ERR_NO_CONN if the client is not currently - connected. mid is the message ID for the publish request. The mid - value can be used to track the publish request by checking against the - mid argument in the on_publish() callback if it is defined. - - A ValueError will be raised if topic == None, has zero length or is - invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if - the length of the payload is greater than 268435455 bytes.""" - if topic == None or len(topic) == 0: - raise ValueError('Invalid topic.') - if qos<0 or qos>2: - raise ValueError('Invalid QoS level.') - if isinstance(payload, str) or isinstance(payload, bytearray): - local_payload = payload - elif isinstance(payload, int) or isinstance(payload, float): - local_payload = str(payload) - elif payload == None: - local_payload = None - else: - raise TypeError('payload must be a string, bytearray, int, float or None.') - - if local_payload != None and len(local_payload) > 268435455: - raise ValueError('Payload too large.') - - if self._topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS: - raise ValueError('Publish topic cannot contain wildcards.') - - local_mid = self._mid_generate() - - if qos == 0: - rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) - return (rc, local_mid) - else: - message = MosquittoMessage() - message.timestamp = time.time() - message.direction = mosq_md_out - if qos == 1: - message.state = mosq_ms_wait_puback - elif qos == 2: - message.state = mosq_ms_wait_pubrec - - message.mid = local_mid - message.topic = topic - if local_payload == None or len(local_payload) == 0: - message.payload = None - else: - message.payload = local_payload - - message.qos = qos - message.retain = retain - message.dup = False - - self._messages.append(message) - rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) - return (rc, local_mid) - - def username_pw_set(self, username, password=None): - """Set a username and optionally a password for broker authentication. - - Must be called before connect() to have any effect. - Requires a broker that supports MQTT v3.1. - - username: The username to authenticate with. Need have no relationship to the client id. - password: The password to authenticate with. Optional, set to None if not required. - """ - self._username = username - self._password = password - - def disconnect(self): - """Disconnect a connected client from the broker.""" - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - self._state_mutex.acquire() - self._state = mosq_cs_disconnecting - self._state_mutex.release() - - return self._send_disconnect() - - def subscribe(self, topic, qos=0): - """Subscribe the client to a topic. - - sub: The subscription topic to subscribe to. - qos: The desired quality of service level for the subscription. - - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS - to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. - mid is the message ID for the subscribe request. The mid value can be - used to track the subscribe request by checking against the mid - argument in the on_subscribe() callback if it is defined. - - Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has - zero string length. - """ - if qos<0 or qos>2: - raise ValueError('Invalid QoS level.') - if topic == None or len(topic) == 0: - raise ValueError('Invalid topic.') - topic = _fix_sub_topic(topic) - - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - return self._send_subscribe(False, topic, qos) - - def unsubscribe(self, topic): - """Unsubscribe the client from a topic. - - sub: The subscription topic to unsubscribe from. - - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS - to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. - mid is the message ID for the unsubscribe request. The mid value can be - used to track the unsubscribe request by checking against the mid - argument in the on_unsubscribe() callback if it is defined. - - Raises a ValueError if topic is None or has zero string length. - """ - if topic == None or len(topic) == 0: - raise ValueError('Invalid topic.') - topic = _fix_sub_topic(topic) - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - return self._send_unsubscribe(False, topic) - - def loop_read(self, max_packets=1): - """Process read network events. Use in place of calling loop() if you - wish to handle your client reads as part of your own application. - - Use socket() to obtain the client socket to call select() or equivalent - on. - - Do not use if you are using the threaded interface loop_start().""" - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - max_packets = len(self._messages) - if max_packets < 1: - max_packets = 1 - - for i in range(0, max_packets): - rc = self._packet_read() - if rc > 0: - return self._loop_rc_handle(rc) - elif rc == MOSQ_ERR_AGAIN: - return MOSQ_ERR_SUCCESS - return MOSQ_ERR_SUCCESS - - def loop_write(self, max_packets=1): - """Process read network events. Use in place of calling loop() if you - wish to handle your client reads as part of your own application. - - Use socket() to obtain the client socket to call select() or equivalent - on. - - Use want_write() to determine if there is data waiting to be written. - - Do not use if you are using the threaded interface loop_start().""" - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - max_packets = len(self._messages) - if max_packets < 1: - max_packets = 1 - - for i in range(0, max_packets): - rc = self._packet_write() - if rc > 0: - return self._loop_rc_handle(rc) - elif rc == MOSQ_ERR_AGAIN: - return MOSQ_ERR_SUCCESS - return MOSQ_ERR_SUCCESS - - def want_write(self): - """Call to determine if there is network data waiting to be written. - Useful if you are calling select() yourself rather than using loop(). - """ - if self._current_out_packet or len(self._out_packet) > 0: - return True - else: - return False - - def loop_misc(self): - """Process miscellaneous network events. Use in place of calling loop() if you - wish to call select() or equivalent on. - - Do not use if you are using the threaded interface loop_start().""" - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - now = time.time() - self._check_keepalive() - if self._last_retry_check+1 < now: - # Only check once a second at most - self._message_retry_check() - self._last_retry_check = now - - if self._ping_t > 0 and now - self._ping_t >= self._keepalive: - # mosq->ping_t != 0 means we are waiting for a pingresp. - # This hasn't happened in the keepalive time so we should disconnect. - if self._ssl: - self._ssl.close() - self._ssl = None - elif self._sock: - self._sock.close() - self._sock = None - - self._callback_mutex.acquire() - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS - else: - rc = 1 - if self.on_disconnect: - self._in_callback = True - self.on_disconnect(self, self._userdata, rc) - self._in_callback = False - self._callback_mutex.release() - return MOSQ_ERR_CONN_LOST - - return MOSQ_ERR_SUCCESS - - def message_retry_set(self, retry): - """Set the timeout in seconds before a message with QoS>0 is retried. - 20 seconds by default.""" - if retry < 0: - raise ValueError('Invalid retry.') - - self._message_retry = retry - - def user_data_set(self, userdata): - """Set the user data variable passed to callbacks. May be any data type.""" - self._userdata = userdata - - def will_set(self, topic, payload=None, qos=0, retain=False): - """Set a Will to be sent by the broker in case the client disconnects unexpectedly. - - This must be called before connect() to have any effect. - - topic: The topic that the will message should be published on. - payload: The message to send as a will. If not given, or set to None a - zero length message will be used as the will. Passing an int or float - will result in the payload being converted to a string representing - that number. If you wish to send a true int/float, use struct.pack() to - create the payload you require. - qos: The quality of service level to use for the will. - retain: If set to true, the will message will be set as the "last known - good"/retained message for the topic. - - Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has - zero string length. - """ - if topic == None or len(topic) == 0: - raise ValueError('Invalid topic.') - if qos<0 or qos>2: - raise ValueError('Invalid QoS level.') - if isinstance(payload, str) or isinstance(payload, bytearray): - self._will_payload = payload - elif isinstance(payload, int) or isinstance(payload, float): - self._will_payload = str(payload) - elif payload == None: - self._will_payload = None - else: - raise TypeError('payload must be a string, bytearray, int, float or None.') - - self._will = True - self._will_topic = topic - self._will_qos = qos - self._will_retain = retain - - def will_clear(self): - """ Removes a will that was previously configured with will_set(). - - Must be called before connect() to have any effect.""" - self._will = False - self._will_topic = "" - self._will_payload = None - self._will_qos = 0 - self._will_retain = False - - def socket(self): - """Return the socket or ssl object for this client.""" - if self._ssl: - return self._ssl - else: - return self._sock - - def loop_forever(self, timeout=1.0, max_packets=1): - """This function call loop() for you in an infinite blocking loop. It - is useful for the case where you only want to run the MQTT client loop - in your program. - - loop_forever() will handle reconnecting for you. If you call - disconnect() in a callback it will return.""" - - run = True - if self._state == mosq_cs_connect_async: - self.reconnect() - - while run: - rc = MOSQ_ERR_SUCCESS - while rc == MOSQ_ERR_SUCCESS: - rc = self.loop(timeout, max_packets) - - if self._state == mosq_cs_disconnecting: - run = False - else: - time.sleep(1) - self.reconnect() - return rc - - def loop_start(self): - """This is part of the threaded client interface. Call this once to - start a new thread to process network traffic. This provides an - alternative to repeatedly calling loop() yourself. - """ - if self._thread != None: - return MOSQ_ERR_INVAL - - self._thread = threading.Thread(target=self._thread_main) - self._thread.daemon = True - self._thread.start() - - def loop_stop(self, force=False): - """This is part of the threaded client interface. Call this once to - stop the network thread previously created with loop_start(). This call - will block until the network thread finishes. - - The force parameter is currently ignored. - """ - if self._thread == None: - return MOSQ_ERR_INVAL - - self._thread_terminate = True - self._thread.join() - self._thread = None - - # ============================================================ - # Private functions - # ============================================================ - - def _loop_rc_handle(self, rc): - if rc: - if self._ssl: - self._ssl.close() - self._ssl = None - elif self._sock: - self._sock.close() - self._sock = None - - self._state_mutex.acquire() - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS - self._state_mutex.release() - self._callback_mutex.acquire() - if self.on_disconnect: - self._in_callback = True - self.on_disconnect(self, self._userdata, rc) - self._in_callback = False - - self._callback_mutex.release() - return rc - - def _packet_read(self): - # This gets called if pselect() indicates that there is network data - # available - ie. at least one byte. What we do depends on what data we - # already have. - # If we've not got a command, attempt to read one and save it. This should - # always work because it's only a single byte. - # Then try to read the remaining length. This may fail because it is may - # be more than one byte - will need to save data pending next read if it - # does fail. - # Then try to read the remaining payload, where 'payload' here means the - # combined variable header and actual payload. This is the most likely to - # fail due to longer length, so save current data and current position. - # After all data is read, send to _mosquitto_handle_packet() to deal with. - # Finally, free the memory and reset everything to starting conditions. - if self._in_packet.command == 0: - try: - if self._ssl: - command = self._ssl.read(1) - else: - command = self._sock.recv(1) - except socket.error as err: - (msg) = err - if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN - if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN - print(msg) - return 1 - else: - if len(command) == 0: - return 1 - command = struct.unpack("!B", command) - self._in_packet.command = command[0] - - if self._in_packet.have_remaining == 0: - # Read remaining - # Algorithm for decoding taken from pseudo code at - # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm - while True: - try: - if self._ssl: - byte = self._ssl.read(1) - else: - byte = self._sock.recv(1) - except socket.error as err: - (msg) = err - if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN - if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN - print(msg) - return 1 - else: - byte = struct.unpack("!B", byte) - byte = byte[0] - self._in_packet.remaining_count.append(byte) - # Max 4 bytes length for remaining length as defined by protocol. - # Anything more likely means a broken/malicious client. - if len(self._in_packet.remaining_count) > 4: - return MOSQ_ERR_PROTOCOL - - self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult - self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128 - - if (byte & 128) == 0: - break - - self._in_packet.have_remaining = 1 - self._in_packet.to_process = self._in_packet.remaining_length - - while self._in_packet.to_process > 0: - try: - if self._ssl: - data = self._ssl.read(self._in_packet.to_process) - else: - data = self._sock.recv(self._in_packet.to_process) - except socket.error as err: - (msg) = err - if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN - if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN - print(msg) - return 1 - else: - self._in_packet.to_process = self._in_packet.to_process - len(data) - self._in_packet.packet = self._in_packet.packet + data - - # All data for this packet is read. - self._in_packet.pos = 0 - rc = self._packet_handle() - - # Free data and reset values - self._in_packet.cleanup() - - self._msgtime_mutex.acquire() - self._last_msg_in = time.time() - self._msgtime_mutex.release() - return rc - - def _packet_write(self): - self._current_out_packet_mutex.acquire() - - while self._current_out_packet: - packet = self._current_out_packet - - try: - if self._ssl: - write_length = self._ssl.write(packet.packet[packet.pos:]) - else: - write_length = self._sock.send(packet.packet[packet.pos:]) - except AttributeError: - self._current_out_packet_mutex.release() - return MOSQ_ERR_SUCCESS - except socket.error as err: - self._current_out_packet_mutex.release() - (msg) = err - if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN - if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN - print(msg) - return 1 - - if write_length > 0: - packet.to_process = packet.to_process - write_length - packet.pos = packet.pos + write_length - - if packet.to_process == 0: - if (packet.command & 0xF0) == PUBLISH and packet.qos == 0: - self._callback_mutex.acquire() - if self.on_publish: - self._in_callback = True - self.on_publish(self, self._userdata, packet.mid) - self._in_callback = False - - self._callback_mutex.release() - - self._out_packet_mutex.acquire() - if len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.pop(0) - else: - self._current_out_packet = None - self._out_packet_mutex.release() - else: - pass # FIXME - - self._current_out_packet_mutex.release() - - self._msgtime_mutex.acquire() - self._last_msg_out = time.time() - self._msgtime_mutex.release() - - return MOSQ_ERR_SUCCESS - - def _easy_log(self, level, buf): - if self.on_log: - self.on_log(self, self._userdata, level, buf) - - def _check_keepalive(self): - now = time.time() - self._msgtime_mutex.acquire() - last_msg_out = self._last_msg_out - last_msg_in = self._last_msg_in - self._msgtime_mutex.release() - if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): - if self._state == mosq_cs_connected and self._ping_t == 0: - self._send_pingreq() - self._msgtime_mutex.acquire() - self._last_msg_out = now - self._last_msg_in = now - self._msgtime_mutex.release() - else: - if self._ssl: - self._ssl.close() - self._ssl = None - elif self._sock: - self._sock.close() - self._sock = None - - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS - else: - rc = 1 - self._callback_mutex.acquire() - if self.on_disconnect: - self._in_callback = True - self.on_disconnect(self, self._userdata, rc) - self._in_callback = False - self._callback_mutex.release() - - def _mid_generate(self): - self._last_mid = self._last_mid + 1 - if self._last_mid == 65536: - self._last_mid = 1 - return self._last_mid - - def _topic_wildcard_len_check(self, topic): - # Search for + or # in a topic. Return MOSQ_ERR_INVAL if found. - # Also returns MOSQ_ERR_INVAL if the topic string is too long. - # Returns MOSQ_ERR_SUCCESS if everything is fine. - if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: - return MOSQ_ERR_INVAL - else: - return MOSQ_ERR_SUCCESS - - def _send_pingreq(self): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGREQ") - rc = self._send_simple_command(PINGREQ) - if rc == MOSQ_ERR_SUCCESS: - self._ping_t = time.time() - return rc - - def _send_pingresp(self): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGRESP") - return self._send_simple_command(PINGRESP) - - def _send_puback(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") - return self._send_command_with_mid(PUBACK, mid, False) - - def _send_pubcomp(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") - return self._send_command_with_mid(PUBCOMP, mid, False) - - def _pack_remaining_length(self, packet, remaining_length): - remaining_bytes = [] - while True: - byte = remaining_length % 128 - remaining_length = remaining_length // 128 - # If there are more digits to encode, set the top bit of this digit - if remaining_length > 0: - byte = byte | 0x80 - - remaining_bytes.append(byte) - packet.extend(struct.pack("!B", byte)) - if remaining_length == 0: - # FIXME - this doesn't deal with incorrectly large payloads - return packet - - def _pack_str16(self, packet, data): - if sys.version_info[0] < 3: - if isinstance(data, bytearray): - packet.extend(struct.pack("!H", len(data))) - packet.extend(data) - elif isinstance(data, str): - pack_format = "!H" + str(len(data)) + "s" - packet.extend(struct.pack(pack_format, len(data), data)) - elif isinstance(data, unicode): - udata = data.encode('utf-8') - pack_format = "!H" + str(len(udata)) + "s" - packet.extend(struct.pack(pack_format, len(udata), udata)) - else: - raise TypeError - else: - if isinstance(data, bytearray): - packet.extend(struct.pack("!H", len(data))) - packet.extend(data) - elif isinstance(data, str): - udata = data.encode('utf-8') - pack_format = "!H" + str(len(udata)) + "s" - packet.extend(struct.pack(pack_format, len(udata), udata)) - else: - raise TypeError - - def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): - if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN - - command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain - packet = bytearray() - packet.extend(struct.pack("!B", command)) - if payload == None: - remaining_length = 2+len(topic) - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)") - else: - remaining_length = 2+len(topic) + len(payload) - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)") - - if qos > 0: - # For message id - remaining_length = remaining_length + 2 - - self._pack_remaining_length(packet, remaining_length) - self._pack_str16(packet, topic) - - if qos > 0: - # For message id - packet.extend(struct.pack("!H", mid)) - - if payload != None: - if isinstance(payload, str): - if sys.version_info[0] < 3: - pack_format = str(len(payload)) + "s" - packet.extend(struct.pack(pack_format, payload)) - else: - upayload = payload.encode('utf-8') - pack_format = str(len(upayload)) + "s" - packet.extend(struct.pack(pack_format, upayload)) - elif isinstance(payload, bytearray): - packet.extend(payload) - elif isinstance(payload, unicode): - upayload = payload.encode('utf-8') - pack_format = str(len(upayload)) + "s" - packet.extend(struct.pack(pack_format, upayload)) - else: - raise TypeError('payload must be a string, unicode or a bytearray.') - - return self._packet_queue(PUBLISH, packet, mid, qos) - - def _send_pubrec(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") - return self._send_command_with_mid(PUBREC, mid, False) - - def _send_pubrel(self, mid, dup=False): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") - return self._send_command_with_mid(PUBREL|2, mid, dup) - - def _send_command_with_mid(self, command, mid, dup): - # For PUBACK, PUBCOMP, PUBREC, and PUBREL - if dup: - command = command | 8 - - remaining_length = 2 - packet = struct.pack('!BBH', command, remaining_length, mid) - return self._packet_queue(command, packet, mid, 1) - - def _send_simple_command(self, command): - # For DISCONNECT, PINGREQ and PINGRESP - remaining_length = 0 - packet = struct.pack('!BB', command, remaining_length) - return self._packet_queue(command, packet, 0, 0) - - def _send_connect(self, keepalive, clean_session): - remaining_length = 12 + 2+len(self._client_id) - connect_flags = 0 - if clean_session: - connect_flags = connect_flags | 0x02 - - if self._will: - remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload) - connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5) - - if self._username: - remaining_length = remaining_length + 2+len(self._username) - connect_flags = connect_flags | 0x80 - if self._password: - connect_flags = connect_flags | 0x40 - remaining_length = remaining_length + 2+len(self._password) - - command = CONNECT - packet = bytearray() - packet.extend(struct.pack("!B", command)) - self._pack_remaining_length(packet, remaining_length) - packet.extend(struct.pack("!H6sBBH", len(PROTOCOL_NAME), PROTOCOL_NAME, PROTOCOL_VERSION, connect_flags, keepalive)) - - self._pack_str16(packet, self._client_id) - - if self._will: - self._pack_str16(packet, self._will_topic) - if len(self._will_payload) > 0: - self._pack_str16(packet, self._will_payload) - else: - packet.extend(struct.pack("!H", 0)) - - if self._username: - self._pack_str16(packet, self._username) - - if self._password: - self._pack_str16(packet, self._password) - - self._keepalive = keepalive - return self._packet_queue(command, packet, 0, 0) - - def _send_disconnect(self): - return self._send_simple_command(DISCONNECT) - - def _send_subscribe(self, dup, topic, topic_qos): - remaining_length = 2 + 2+len(topic) + 1 - command = SUBSCRIBE | (dup<<3) | (1<<1) - packet = bytearray() - packet.extend(struct.pack("!B", command)) - self._pack_remaining_length(packet, remaining_length) - local_mid = self._mid_generate() - packet.extend(struct.pack("!H", local_mid)) - self._pack_str16(packet, topic) - packet.extend(struct.pack("B", topic_qos)) - return (self._packet_queue(command, packet, local_mid, 1), local_mid) - - def _send_unsubscribe(self, dup, topic): - remaining_length = 2 + 2+len(topic) - command = UNSUBSCRIBE | (dup<<3) | (1<<1) - packet = bytearray() - packet.extend(struct.pack("!B", command)) - self._pack_remaining_length(packet, remaining_length) - local_mid = self._mid_generate() - packet.extend(struct.pack("!H", local_mid)) - self._pack_str16(packet, topic) - return (self._packet_queue(command, packet, local_mid, 1), local_mid) - - def _message_update(self, mid, direction, state): - for m in self._messages: - if m.mid == mid and m.direction == direction: - m.state = state - m.timestamp = time.time() - return MOSQ_ERR_SUCCESS - - return MOSQ_ERR_NOT_FOUND - - def _message_retry_check(self): - now = time.time() - for m in self._messages: - if m.timestamp + self._message_retry < now: - if m.state == mosq_ms_wait_puback or m.state == mosq_ms_wait_pubrec: - m.timestamp = now - m.dup = True - self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) - elif m.state == mosq_ms_wait_pubrel: - m.timestamp = now - m.dup = True - self._send_pubrec(m.mid) - elif m.state == mosq_ms_wait_pubcomp: - m.timestamp = now - m.dup = True - self._send_pubrel(m.mid, True) - - def _messages_reconnect_reset(self): - for m in self._messages: - m.timestamp = 0 - if m.direction == mosq_md_out: - if m.qos == 1: - m.state = mosq_ms_wait_puback - elif m.qos == 2: - m.state = mosq_ms_wait_pubrec - else: - self._messages.pop(self._messages.index(m)) - - def _packet_queue(self, command, packet, mid, qos): - mpkt = MosquittoPacket(command, packet, mid, qos) - - self._out_packet_mutex.acquire() - self._out_packet.append(mpkt) - if self._current_out_packet_mutex.acquire(False): - if self._current_out_packet == None and len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.pop(0) - self._current_out_packet_mutex.release() - self._out_packet_mutex.release() - - if not self._in_callback and self._thread == None: - return self.loop_write() - else: - return MOSQ_ERR_SUCCESS - - def _packet_handle(self): - cmd = self._in_packet.command&0xF0 - if cmd == PINGREQ: - return self._handle_pingreq() - elif cmd == PINGRESP: - return self._handle_pingresp() - elif cmd == PUBACK: - return self._handle_pubackcomp("PUBACK") - elif cmd == PUBCOMP: - return self._handle_pubackcomp("PUBCOMP") - elif cmd == PUBLISH: - return self._handle_publish() - elif cmd == PUBREC: - return self._handle_pubrec() - elif cmd == PUBREL: - return self._handle_pubrel() - elif cmd == CONNACK: - return self._handle_connack() - elif cmd == SUBACK: - return self._handle_suback() - elif cmd == UNSUBACK: - return self._handle_unsuback() - else: - # If we don't recognise the command, return an error straight away. - self._easy_log(MOSQ_LOG_ERR, "Error: Unrecognised command "+str(cmd)) - return MOSQ_ERR_PROTOCOL - - def _handle_pingreq(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 0: - return MOSQ_ERR_PROTOCOL - - self._easy_log(MOSQ_LOG_DEBUG, "Received PINGREQ") - return self._send_pingresp() - - def _handle_pingresp(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 0: - return MOSQ_ERR_PROTOCOL - - # No longer waiting for a PINGRESP. - self._ping_t = 0 - self._easy_log(MOSQ_LOG_DEBUG, "Received PINGRESP") - return MOSQ_ERR_SUCCESS - - def _handle_connack(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL - - if len(self._in_packet.packet) != 2: - return MOSQ_ERR_PROTOCOL - - (resvd, result) = struct.unpack("!BB", self._in_packet.packet) - self._easy_log(MOSQ_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")") - self._callback_mutex.acquire() - if self.on_connect: - self._in_callback = True - self.on_connect(self, self._userdata, result) - self._in_callback = False - self._callback_mutex.release() - if result == 0: - self._state = mosq_cs_connected - return MOSQ_ERR_SUCCESS - elif result > 0 and result < 6: - return MOSQ_ERR_CONN_REFUSED - else: - return MOSQ_ERR_PROTOCOL - - def _handle_suback(self): - self._easy_log(MOSQ_LOG_DEBUG, "Received SUBACK") - pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's' - (mid, packet) = struct.unpack(pack_format, self._in_packet.packet) - pack_format = "!" + "B"*len(packet) - granted_qos = struct.unpack(pack_format, packet) - - self._callback_mutex.acquire() - if self.on_subscribe: - self._in_callback = True - self.on_subscribe(self, self._userdata, mid, granted_qos) - self._in_callback = False - self._callback_mutex.release() - - return MOSQ_ERR_SUCCESS - - def _handle_publish(self): - rc = 0 - - header = self._in_packet.command - message = MosquittoMessage() - message.direction = mosq_md_in - message.dup = (header & 0x08)>>3 - message.qos = (header & 0x06)>>1 - message.retain = (header & 0x01) - - pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's' - (slen, packet) = struct.unpack(pack_format, self._in_packet.packet) - pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's' - (message.topic, packet) = struct.unpack(pack_format, packet) - - if len(message.topic) == 0: - return MOSQ_ERR_PROTOCOL - - if sys.version_info[0] >= 3: - message.topic = message.topic.decode('utf-8') - message.topic = _fix_sub_topic(message.topic) - - if message.qos > 0: - pack_format = "!H" + str(len(packet)-2) + 's' - (message.mid, packet) = struct.unpack(pack_format, packet) - - message.payload = packet - - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+ - ", q"+str(message.qos)+", r"+str(message.retain)+ - ", m"+str(message.mid)+", '"+message.topic+ - "', ... ("+str(len(message.payload))+" bytes)") - - message.timestamp = time.time() - if message.qos == 0: - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, message) - self._in_callback = False - - self._callback_mutex.release() - return MOSQ_ERR_SUCCESS - elif message.qos == 1: - rc = self._send_puback(message.mid) - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, message) - self._in_callback = False - - self._callback_mutex.release() - return rc - elif message.qos == 2: - rc = self._send_pubrec(message.mid) - message.state = mosq_ms_wait_pubrel - self._messages.append(message) - return rc - else: - return MOSQ_ERR_PROTOCOL - - def _handle_pubrel(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL - - if len(self._in_packet.packet) != 2: - return MOSQ_ERR_PROTOCOL - - mid = struct.unpack("!H", self._in_packet.packet) - mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") - - for i in range(len(self._messages)): - if self._messages[i].direction == mosq_md_in and self._messages[i].mid == mid: - - # Only pass the message on if we have removed it from the queue - this - # prevents multiple callbacks for the same message. - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, self._messages[i]) - self._in_callback = False - self._callback_mutex.release() - self._messages.pop(i) - - return self._send_pubcomp(mid) - - return MOSQ_ERR_SUCCESS - - def _handle_pubrec(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL - - mid = struct.unpack("!H", self._in_packet.packet) - mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") - - for i in range(len(self._messages)): - if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: - self._messages[i].state = mosq_ms_wait_pubcomp - self._messages[i].timestamp = time.time() - return self._send_pubrel(mid, False) - - return MOSQ_ERR_SUCCESS - - def _handle_unsuback(self): - if self._strict_protocol: - if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL - - mid = struct.unpack("!H", self._in_packet.packet) - mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") - self._callback_mutex.acquire() - if self.on_unsubscribe: - self._in_callback = True - self.on_unsubscribe(self, self._userdata, mid) - self._in_callback = False - self._callback_mutex.release() - return MOSQ_ERR_SUCCESS - - def _handle_pubackcomp(self, cmd): - if self._strict_protocol: - if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL - - mid = struct.unpack("!H", self._in_packet.packet) - mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") - - for i in range(len(self._messages)): - try: - if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: - # Only inform the client the message has been sent once. - self._callback_mutex.acquire() - if self.on_publish: - self._in_callback = True - self.on_publish(self, self._userdata, mid) - self._in_callback = False - - self._callback_mutex.release() - self._messages.pop(i) - except IndexError: - # Have removed item so i>count. - # Not really an error. - pass - - return MOSQ_ERR_SUCCESS - - def _thread_main(self): - run = True - self._thread_terminate = False - self._state_mutex.acquire() - if self._state == mosq_cs_connect_async: - self._state_mutex.release() - self.reconnect() - else: - self._state_mutex.release() - - while run: - rc = MOSQ_ERR_SUCCESS - while rc == MOSQ_ERR_SUCCESS: - rc = self.loop() - if self._thread_terminate: - rc = 1 - run = False - - self._state_mutex.acquire() - if self._state == mosq_cs_disconnecting: - run = False - self._state_mutex.release() - else: - self._state_mutex.release() - time.sleep(1) - self.reconnect() - diff --git a/lib/python/setup.py b/lib/python/setup.py deleted file mode 100644 index 3d52d53..0000000 --- a/lib/python/setup.py +++ /dev/null @@ -1,28 +0,0 @@ -from sys import version - -from distutils.core import setup -setup(name='mosquitto', - version='1.1.1', - description='MQTT version 3.1 client class', - author='Roger Light', - author_email='roger@atchoo.org', - url='http://mosquitto.org/', - download_url='http://mosquitto.org/files/', - license='BSD License', - py_modules=['mosquitto'], - - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: BSD License', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: Microsoft :: Windows', - 'Operating System :: POSIX', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.6', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Topic :: Communications', - 'Topic :: Internet', - ] - ) diff --git a/lib/python/sub.py b/lib/python/sub.py deleted file mode 100755 index 7af946d..0000000 --- a/lib/python/sub.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/python - -# Copyright (c) 2010,2011 Roger Light -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of mosquitto nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -import mosquitto - -def on_connect(mosq, obj, rc): - print("rc: "+str(rc)) - -def on_message(mosq, obj, msg): - print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) - -def on_publish(mosq, obj, mid): - print("mid: "+str(mid)) - -def on_subscribe(mosq, obj, mid, granted_qos): - print("Subscribed: "+str(mid)+" "+str(granted_qos)) - -def on_log(mosq, obj, level, string): - print(string) - -mqttc = mosquitto.Mosquitto() -mqttc.on_message = on_message -mqttc.on_connect = on_connect -mqttc.on_publish = on_publish -mqttc.on_subscribe = on_subscribe -# Uncomment to enable debug messages -#mqttc.on_log = on_log -mqttc.connect("test.mosquitto.org", 1883, 60) -mqttc.subscribe("$SYS/#", 0) - - -rc = 0 -while rc == 0: - rc = mqttc.loop() - -print("rc: "+str(rc)) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3d52d53 --- /dev/null +++ b/setup.py @@ -0,0 +1,28 @@ +from sys import version + +from distutils.core import setup +setup(name='mosquitto', + version='1.1.1', + description='MQTT version 3.1 client class', + author='Roger Light', + author_email='roger@atchoo.org', + url='http://mosquitto.org/', + download_url='http://mosquitto.org/files/', + license='BSD License', + py_modules=['mosquitto'], + + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Operating System :: MacOS :: MacOS X', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: POSIX', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Topic :: Communications', + 'Topic :: Internet', + ] + ) diff --git a/src/paho/mqtt/mosquitto.py b/src/paho/mqtt/mosquitto.py new file mode 100755 index 0000000..86f6ce9 --- /dev/null +++ b/src/paho/mqtt/mosquitto.py @@ -0,0 +1,1781 @@ +# Copyright (c) 2012 Roger Light +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of mosquitto nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# +# This product includes software developed by the OpenSSL Project for use in +# the OpenSSL Toolkit. (http://www.openssl.org/) +# This product includes cryptographic software written by Eric Young +# (eay@cryptsoft.com) +# This product includes software written by Tim Hudson (tjh@cryptsoft.com) + +""" +This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging +protocol that is easy to implement and suitable for low powered devices. +""" +import errno +import random +import select +import socket +import ssl +import struct +import sys +import threading +import time + +if sys.version_info[0] < 3: + PROTOCOL_NAME = "MQIsdp" +else: + PROTOCOL_NAME = b"MQIsdp" + +PROTOCOL_VERSION = 3 + +# Message types +CONNECT = 0x10 +CONNACK = 0x20 +PUBLISH = 0x30 +PUBACK = 0x40 +PUBREC = 0x50 +PUBREL = 0x60 +PUBCOMP = 0x70 +SUBSCRIBE = 0x80 +SUBACK = 0x90 +UNSUBSCRIBE = 0xA0 +UNSUBACK = 0xB0 +PINGREQ = 0xC0 +PINGRESP = 0xD0 +DISCONNECT = 0xE0 + +# Log levels +MOSQ_LOG_INFO = 0x01 +MOSQ_LOG_NOTICE = 0x02 +MOSQ_LOG_WARNING = 0x04 +MOSQ_LOG_ERR = 0x08 +MOSQ_LOG_DEBUG = 0x10 + +# CONNACK codes +CONNACK_ACCEPTED = 0 +CONNACK_REFUSED_PROTOCOL_VERSION = 1 +CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 +CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 +CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 +CONNACK_REFUSED_NOT_AUTHORIZED = 5 + +# Connection state +mosq_cs_new = 0 +mosq_cs_connected = 1 +mosq_cs_disconnecting = 2 +mosq_cs_connect_async = 3 + +# Message direction +mosq_md_invalid = 0 +mosq_md_in = 1 +mosq_md_out = 2 + +# Message state +mosq_ms_invalid = 0, +mosq_ms_wait_puback = 1 +mosq_ms_wait_pubrec = 2 +mosq_ms_wait_pubrel = 3 +mosq_ms_wait_pubcomp = 4 + +# Error values +MOSQ_ERR_AGAIN = -1 +MOSQ_ERR_SUCCESS = 0 +MOSQ_ERR_NOMEM = 1 +MOSQ_ERR_PROTOCOL = 2 +MOSQ_ERR_INVAL = 3 +MOSQ_ERR_NO_CONN = 4 +MOSQ_ERR_CONN_REFUSED = 5 +MOSQ_ERR_NOT_FOUND = 6 +MOSQ_ERR_CONN_LOST = 7 +MOSQ_ERR_TLS = 8 +MOSQ_ERR_PAYLOAD_SIZE = 9 +MOSQ_ERR_NOT_SUPPORTED = 10 +MOSQ_ERR_AUTH = 11 +MOSQ_ERR_ACL_DENIED = 12 +MOSQ_ERR_UNKNOWN = 13 +MOSQ_ERR_ERRNO = 14 + +def _fix_sub_topic(subtopic): + # Convert ////some////over/slashed///topic/etc/etc// + # into some/over/slashed/topic/etc/etc + if subtopic[0] == '/': + return '/'+'/'.join(filter(None, subtopic.split('/'))) + else: + return '/'.join(filter(None, subtopic.split('/'))) + +def error_string(mosq_errno): + """Return the error string associated with a mosquitto error number.""" + if mosq_errno == MOSQ_ERR_SUCCESS: + return "No error." + elif mosq_errno == MOSQ_ERR_NOMEM: + return "Out of memory." + elif mosq_errno == MOSQ_ERR_PROTOCOL: + return "A network protocol error occurred when communicating with the broker." + elif mosq_errno == MOSQ_ERR_INVAL: + return "Invalid function arguments provided." + elif mosq_errno == MOSQ_ERR_NO_CONN: + return "The client is not currently connected." + elif mosq_errno == MOSQ_ERR_CONN_REFUSED: + return "The connection was refused." + elif mosq_errno == MOSQ_ERR_NOT_FOUND: + return "Message not found (internal error)." + elif mosq_errno == MOSQ_ERR_CONN_LOST: + return "The connection was lost." + elif mosq_errno == MOSQ_ERR_TLS: + return "A TLS error occurred." + elif mosq_errno == MOSQ_ERR_PAYLOAD_SIZE: + return "Payload too large." + elif mosq_errno == MOSQ_ERR_NOT_SUPPORTED: + return "This feature is not supported." + elif mosq_errno == MOSQ_ERR_AUTH: + return "Authorisation failed." + elif mosq_errno == MOSQ_ERR_ACL_DENIED: + return "Access denied by ACL." + elif mosq_errno == MOSQ_ERR_UNKNOWN: + return "Unknown error." + elif mosq_errno == MOSQ_ERR_ERRNO: + return "Error defined by errno." + else: + return "Unknown error." + +def connack_string(connack_code): + """Return the string associated with a CONNACK result.""" + if connack_code == 0: + return "Connection Accepted." + elif connack_code == 1: + return "Connection Refused: unacceptable protocol version." + elif connack_code == 2: + return "Connection Refused: identifier rejected." + elif connack_code == 3: + return "Connection Refused: broker unavailable." + elif connack_code == 4: + return "Connection Refused: bad user name or password." + elif connack_code == 5: + return "Connection Refused: not authorised." + else: + return "Connection Refused: unknown reason." + +def topic_matches_sub(sub, topic): + """Check whether a topic matches a subscription. + + For example: + + foo/bar would match the subscription foo/# or +/bar + non/matching would not match the subscription non/+/+ + """ + result = True + local_sub = _fix_sub_topic(sub) + local_topic = _fix_sub_topic(topic) + multilevel_wildcard = False + + slen = len(local_sub) + tlen = len(local_topic) + + spos = 0; + tpos = 0; + + while spos < slen and tpos < tlen: + if local_sub[spos] == local_topic[tpos]: + spos += 1 + tpos += 1 + else: + if local_sub[spos] == '+': + spos += 1 + while tpos < tlen and local_topic[tpos] != '/': + tpos += 1 + if tpos == tlen and spos == slen: + result = True + break + + elif local_sub[spos] == '#': + multilevel_wildcard = True + if spos+1 != slen: + result = False + break + else: + result = True + break + + else: + result = False + break + + if tpos == tlen-1: + # Check for e.g. foo matching foo/# + if spos == slen-3 and local_sub[spos+1] == '/' and local_sub[spos+2] == '#': + result = True + multilevel_wildcard = True + break + + if not multilevel_wildcard and (tpos < tlen or spos < slen): + result = False + + return result + + +class MosquittoMessage: + """ This is a class that describes an incoming message. It is passed to the + on_message callback as the message parameter. + + Members: + + topic : String. topic that the message was published on. + payload : String/bytes the message payload. + qos : Integer. The message Quality of Service 0, 1 or 2. + retain : Boolean. If true, the message is a retained message and not fresh. + mid : Integer. The message id. + """ + def __init__(self): + self.timestamp = 0 + self.direction = mosq_md_invalid + self.state = mosq_ms_invalid + self.dup = False + self.mid = 0 + self.topic = "" + self.payload = None + self.qos = 0 + self.retain = False + +class MosquittoInPacket: + """Internal datatype.""" + def __init__(self): + self.command = 0 + self.have_remaining = 0 + self.remaining_count = [] + self.remaining_mult = 1 + self.remaining_length = 0 + self.packet = b"" + self.to_process = 0 + self.pos = 0 + + def cleanup(self): + self.__init__() + +class MosquittoPacket: + """Internal datatype.""" + def __init__(self, command, packet, mid, qos): + self.command = command + self.mid = mid + self.qos = qos + self.pos = 0 + self.to_process = len(packet) + self.packet = packet + +class Mosquitto: + """MQTT version 3.1 client class. + + This is the main class for use communicating with an MQTT broker. + + General usage flow: + + * Use connect()/connect_async() to connect to a broker + * Call loop() frequently to maintain network traffic flow with the broker + * Or use loop_start() to set a thread running to call loop() for you. + * Or use loop_forever() to handle calling loop() for you in a blocking + * function. + * Use subscribe() to subscribe to a topic and receive messages + * Use publish() to send messages + * Use disconnect() to disconnect from the broker + + Data returned from the broker is made available with the use of callback + functions as described below. + + Callbacks + ========= + + A number of callback functions are available to receive data back from the + broker. To use a callback, define a function and then assign it to the + client: + + def on_connect(mosq, userdata, rc): + print("Connection returned " + str(rc)) + + client.on_connect = on_connect + + All of the callbacks as described below have a "mosq" and an "userdata" + argument. "mosq" is the Mosquitto instance that is calling the callback. + "userdata" is user data of any type and can be set when creating a new client + instance or with user_data_set(userdata). + + The callbacks: + + on_connect(mosq, userdata, rc): called when the broker responds to our connection + request. The value of rc determines success or not: + 0: Connection successful + 1: Connection refused - incorrect protocol version + 2: Connection refused - invalid client identifier + 3: Connection refused - server unavailable + 4: Connection refused - bad username or password + 5: Connection refused - not authorised + 6-255: Currently unused. + + on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker. + The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS + (0), the callback was called in response to a disconnect() call. If any + other value the disconnection was unexpected, such as might be caused by + a network error. + + on_message(mosq, userdata, message): called when a message has been received on a + topic that the client subscribes to. The message variable is a + MosquittoMessage that describes all of the message parameters. + + on_publish(mosq, userdata, mid): called when a message that was to be sent using the + publish() call has completed transmission to the broker. For messages + with QoS levels 1 and 2, this means that the appropriate handshakes have + completed. For QoS 0, this simply means that the message has left the + client. The mid variable matches the mid variable returned from the + corresponding publish() call, to allow outgoing messages to be tracked. + This callback is important because even if the publish() call returns + success, it does not always mean that the message has been sent. + + on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a + subscribe request. The mid variable matches the mid variable returned + from the corresponding subscribe() call. The granted_qos variable is a + list of integers that give the QoS level the broker has granted for each + of the different subscription requests. + + on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe + request. The mid variable matches the mid variable returned from the + corresponding unsubscribe() call. + + on_log(mosq, userdata, level, buf): called when the client has log information. Define + to allow debugging. The level variable gives the severity of the message + and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING, + MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf. + + """ + def __init__(self, client_id="", clean_session=True, userdata=None): + """client_id is the unique client id string used when connecting to the + broker. If client_id is zero length or None, then one will be randomly + generated. In this case, clean_session must be True. If this is not the + case a ValueError will be raised. + + clean_session is a boolean that determines the client type. If True, + the broker will remove all information about this client when it + disconnects. If False, the client is a persistent client and + subscription information and queued messages will be retained when the + client disconnects. + Note that a client will never discard its own outgoing messages on + disconnect. Calling connect() or reconnect() will cause the messages to + be resent. Use reinitialise() to reset a client to its original state. + + userdata is user defined data of any type that is passed as the "userdata" + parameter to callbacks. It may be updated at a later point with the + user_data_set() function. + """ + if not clean_session and (client_id == "" or client_id == None): + raise ValueError('A client id must be provided if clean session is False.') + + self._userdata = userdata + self._sock = None + self._keepalive = 60 + self._message_retry = 20 + self._last_retry_check = 0 + self._clean_session = clean_session + if client_id == "": + self._client_id = "mosq/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) + else: + self._client_id = client_id + + self._username = "" + self._password = "" + self._in_packet = MosquittoInPacket() + self._out_packet = [] + self._current_out_packet = None + self._last_msg_in = time.time() + self._last_msg_out = time.time() + self._ping_t = 0 + self._last_mid = 0 + self._state = mosq_cs_new + self._messages = [] + self._will = False + self._will_topic = "" + self._will_payload = None + self._will_qos = 0 + self._will_retain = False + self.on_disconnect = None + self.on_connect = None + self.on_publish = None + self.on_message = None + self.on_subscribe = None + self.on_unsubscribe = None + self.on_log = None + self._host = "" + self._port = 1883 + self._in_callback = False + self._strict_protocol = False + self._callback_mutex = threading.Lock() + self._state_mutex = threading.Lock() + self._out_packet_mutex = threading.Lock() + self._current_out_packet_mutex = threading.Lock() + self._msgtime_mutex = threading.Lock() + self._thread = None + self._thread_terminate = False + self._ssl = None + self._tls_certfile = None + self._tls_keyfile = None + self._tls_ca_certs = None + self._tls_cert_reqs = None + self._tls_ciphers = None + + def __del__(self): + pass + + def reinitialise(self, client_id="", clean_session=True, userdata=None): + if self._ssl: + self._ssl.close() + self._ssl = None + self._sock = None + elif self._sock: + self._sock.close() + self._sock = None + self.__init__(client_id, clean_session, userdata) + + def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None): + """Configure network encryption and authentication options. Enables SSL/TLS support. + + ca_certs : a string path to the Certificate Authority certificate files + that are to be treated as trusted by this client. If this is the only + option given then the client will operate in a similar manner to a web + browser. That is to say it will require the broker to have a + certificate signed by the Certificate Authorities in ca_certs and will + communicate using TLS v1, but will not attempt any form of + authentication. This provides basic network encryption but may not be + sufficient depending on how the broker is configured. + + certfile and keyfile are strings pointing to the PEM encoded client + certificate and private keys respectively. If these arguments are not + None then they will be used as client information for TLS based + authentication. Support for this feature is broker dependent. Note + that if either of these files in encrypted and needs a password to + decrypt it, Python will ask for the password at the command line. It is + not currently possible to define a callback to provide the password. + + cert_reqs allows the certificate requirements that the client imposes + on the broker to be changed. By default this is ssl.CERT_REQUIRED, + which means that the broker must provide a certificate. See the ssl + pydoc for more information on this parameter. + + tls_version allows the version of the SSL/TLS protocol used to be + specified. By default TLS v1 is used. Previous versions (all versions + beginning with SSL) are possible but not recommended due to possible + security problems. + + ciphers is a string specifying which encryption ciphers are allowable + for this connection, or None to use the defaults. See the ssl pydoc for + more information. + + Must be called before connect() or connect_async().""" + if sys.version < '2.7': + raise ValueError('Python 2.7 is the minimum supported version for TLS.') + + if ca_certs == None: + raise ValueError('ca_certs must not be None.') + + try: + f = open(ca_certs, "r") + except IOError as err: + raise IOError(ca_certs+": "+err.strerror) + else: + f.close() + if certfile != None: + try: + f = open(certfile, "r") + except IOError as err: + raise IOError(certfile+": "+err.strerror) + else: + f.close() + if keyfile != None: + try: + f = open(keyfile, "r") + except IOError as err: + raise IOError(keyfile+": "+err.strerror) + else: + f.close() + + self._tls_ca_certs = ca_certs + self._tls_certfile = certfile + self._tls_keyfile = keyfile + self._tls_cert_reqs = cert_reqs + self._tls_version = tls_version + self._tls_ciphers = ciphers + + def connect(self, host, port=1883, keepalive=60): + """Connect to a remote broker. + + host is the hostname or IP address of the remote broker. + port is the network port of the server host to connect to. Defaults to + 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you + are using tls_set() the port may need providing. + keepalive: Maximum period in seconds between communications with the + broker. If no other messages are being exchanged, this controls the + rate at which the client will send ping messages to the broker. + """ + self.connect_async(host, port, keepalive) + return self.reconnect() + + def connect_async(self, host, port=1883, keepalive=60): + """Connect to a remote broker asynchronously. This is a non-blocking + connect call that can be used with loop_start() to provide very quick + start. + + host is the hostname or IP address of the remote broker. + port is the network port of the server host to connect to. Defaults to + 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you + are using tls_set() the port may need providing. + keepalive: Maximum period in seconds between communications with the + broker. If no other messages are being exchanged, this controls the + rate at which the client will send ping messages to the broker. + """ + if host == None or len(host) == 0: + raise ValueError('Invalid host.') + if port <= 0: + raise ValueError('Invalid port number.') + if keepalive < 0: + raise ValueError('Keepalive must be >=0.') + + self._host = host + self._port = port + self._keepalive = keepalive + + self._state_mutex.acquire() + self._state = mosq_cs_connect_async + self._state_mutex.release() + + def reconnect(self): + """Reconnect the client after a disconnect. Can only be called after + connect()/connect_async().""" + if len(self._host) == 0: + raise ValueError('Invalid host.') + if self._port <= 0: + raise ValueError('Invalid port number.') + + self._in_packet.cleanup() + self._out_packet_mutex.acquire() + self._out_packet = [] + self._out_packet_mutex.release() + + self._current_out_packet_mutex.acquire() + self._current_out_packet = None + self._current_out_packet_mutex.release() + + self._msgtime_mutex.acquire() + self._last_msg_in = time.time() + self._last_msg_out = time.time() + self._msgtime_mutex.release() + + self._ping_t = 0 + self._state_mutex.acquire() + self._state = mosq_cs_new + self._state_mutex.release() + if self._ssl: + self._ssl.close() + self._ssl = None + self._sock = None + elif self._sock: + self._sock.close() + self._sock = None + + # Put messages in progress in a valid state. + self._messages_reconnect_reset() + + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # FIXME use create_connection here + + + if self._tls_ca_certs != None: + self._ssl = ssl.wrap_socket(self._sock, + certfile=self._tls_certfile, + keyfile=self._tls_keyfile, + ca_certs=self._tls_ca_certs, + cert_reqs=self._tls_cert_reqs, + ssl_version=self._tls_version, + ciphers=self._tls_ciphers) + + try: + self.socket().connect((self._host, self._port)) + except socket.error as err: + (msg) = err + if msg.errno != errno.EINPROGRESS: + print(msg) + return 1 + + self._sock.setblocking(0) + + return self._send_connect(self._keepalive, self._clean_session) + + def loop(self, timeout=1.0, max_packets=1): + """Process network events. + + This function must be called regularly to ensure communication with the + broker is carried out. It calls select() on the network socket to wait + for network events. If incoming data is present it will then be + processed. Outgoing commands, from e.g. publish(), are normally sent + immediately that their function is called, but this is not always + possible. loop() will also attempt to send any remaining outgoing + messages, which also includes commands that are part of the flow for + messages with QoS>0. + + timeout: The time in seconds to wait for incoming/outgoing network + traffic before timing out and returning. + max_packets: Not currently used. + + Returns MOSQ_ERR_SUCCESS on success. + Returns >0 on error. + + A ValueError will be raised if timeout < 0""" + if timeout < 0.0: + raise ValueError('Invalid timeout.') + + self._current_out_packet_mutex.acquire() + self._out_packet_mutex.acquire() + if self._current_out_packet == None and len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + + if self._current_out_packet: + wlist = [self.socket()] + else: + wlist = [] + self._out_packet_mutex.release() + self._current_out_packet_mutex.release() + + rlist = [self.socket()] + try: + socklist = select.select(rlist, wlist, [], timeout) + except TypeError: + # Socket isn't correct type, in likelihood connection is lost + return MOSQ_ERR_CONN_LOST + + if self.socket() in socklist[0]: + rc = self.loop_read(max_packets) + if rc or (self._ssl == None and self._sock == None): + return rc + + if self.socket() in socklist[1]: + rc = self.loop_write(max_packets) + if rc or (self._ssl == None and self._sock == None): + return rc + + return self.loop_misc() + + def publish(self, topic, payload=None, qos=0, retain=False): + """Publish a message on a topic. + + This causes a message to be sent to the broker and subsequently from + the broker to any clients subscribing to matching topics. + + topic: The topic that the message should be published on. + payload: The actual message to send. If not given, or set to None a + zero length message will be used. Passing an int or float will result + in the payload being converted to a string representing that number. If + you wish to send a true int/float, use struct.pack() to create the + payload you require. + qos: The quality of service level to use. + retain: If set to true, the message will be set as the "last known + good"/retained message for the topic. + + Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS to + indicate success or MOSQ_ERR_NO_CONN if the client is not currently + connected. mid is the message ID for the publish request. The mid + value can be used to track the publish request by checking against the + mid argument in the on_publish() callback if it is defined. + + A ValueError will be raised if topic == None, has zero length or is + invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if + the length of the payload is greater than 268435455 bytes.""" + if topic == None or len(topic) == 0: + raise ValueError('Invalid topic.') + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if isinstance(payload, str) or isinstance(payload, bytearray): + local_payload = payload + elif isinstance(payload, int) or isinstance(payload, float): + local_payload = str(payload) + elif payload == None: + local_payload = None + else: + raise TypeError('payload must be a string, bytearray, int, float or None.') + + if local_payload != None and len(local_payload) > 268435455: + raise ValueError('Payload too large.') + + if self._topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS: + raise ValueError('Publish topic cannot contain wildcards.') + + local_mid = self._mid_generate() + + if qos == 0: + rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) + return (rc, local_mid) + else: + message = MosquittoMessage() + message.timestamp = time.time() + message.direction = mosq_md_out + if qos == 1: + message.state = mosq_ms_wait_puback + elif qos == 2: + message.state = mosq_ms_wait_pubrec + + message.mid = local_mid + message.topic = topic + if local_payload == None or len(local_payload) == 0: + message.payload = None + else: + message.payload = local_payload + + message.qos = qos + message.retain = retain + message.dup = False + + self._messages.append(message) + rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) + return (rc, local_mid) + + def username_pw_set(self, username, password=None): + """Set a username and optionally a password for broker authentication. + + Must be called before connect() to have any effect. + Requires a broker that supports MQTT v3.1. + + username: The username to authenticate with. Need have no relationship to the client id. + password: The password to authenticate with. Optional, set to None if not required. + """ + self._username = username + self._password = password + + def disconnect(self): + """Disconnect a connected client from the broker.""" + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + self._state_mutex.acquire() + self._state = mosq_cs_disconnecting + self._state_mutex.release() + + return self._send_disconnect() + + def subscribe(self, topic, qos=0): + """Subscribe the client to a topic. + + sub: The subscription topic to subscribe to. + qos: The desired quality of service level for the subscription. + + Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS + to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. + mid is the message ID for the subscribe request. The mid value can be + used to track the subscribe request by checking against the mid + argument in the on_subscribe() callback if it is defined. + + Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has + zero string length. + """ + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if topic == None or len(topic) == 0: + raise ValueError('Invalid topic.') + topic = _fix_sub_topic(topic) + + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + return self._send_subscribe(False, topic, qos) + + def unsubscribe(self, topic): + """Unsubscribe the client from a topic. + + sub: The subscription topic to unsubscribe from. + + Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS + to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. + mid is the message ID for the unsubscribe request. The mid value can be + used to track the unsubscribe request by checking against the mid + argument in the on_unsubscribe() callback if it is defined. + + Raises a ValueError if topic is None or has zero string length. + """ + if topic == None or len(topic) == 0: + raise ValueError('Invalid topic.') + topic = _fix_sub_topic(topic) + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + return self._send_unsubscribe(False, topic) + + def loop_read(self, max_packets=1): + """Process read network events. Use in place of calling loop() if you + wish to handle your client reads as part of your own application. + + Use socket() to obtain the client socket to call select() or equivalent + on. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + max_packets = len(self._messages) + if max_packets < 1: + max_packets = 1 + + for i in range(0, max_packets): + rc = self._packet_read() + if rc > 0: + return self._loop_rc_handle(rc) + elif rc == MOSQ_ERR_AGAIN: + return MOSQ_ERR_SUCCESS + return MOSQ_ERR_SUCCESS + + def loop_write(self, max_packets=1): + """Process read network events. Use in place of calling loop() if you + wish to handle your client reads as part of your own application. + + Use socket() to obtain the client socket to call select() or equivalent + on. + + Use want_write() to determine if there is data waiting to be written. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + max_packets = len(self._messages) + if max_packets < 1: + max_packets = 1 + + for i in range(0, max_packets): + rc = self._packet_write() + if rc > 0: + return self._loop_rc_handle(rc) + elif rc == MOSQ_ERR_AGAIN: + return MOSQ_ERR_SUCCESS + return MOSQ_ERR_SUCCESS + + def want_write(self): + """Call to determine if there is network data waiting to be written. + Useful if you are calling select() yourself rather than using loop(). + """ + if self._current_out_packet or len(self._out_packet) > 0: + return True + else: + return False + + def loop_misc(self): + """Process miscellaneous network events. Use in place of calling loop() if you + wish to call select() or equivalent on. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + now = time.time() + self._check_keepalive() + if self._last_retry_check+1 < now: + # Only check once a second at most + self._message_retry_check() + self._last_retry_check = now + + if self._ping_t > 0 and now - self._ping_t >= self._keepalive: + # mosq->ping_t != 0 means we are waiting for a pingresp. + # This hasn't happened in the keepalive time so we should disconnect. + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + self._callback_mutex.acquire() + if self._state == mosq_cs_disconnecting: + rc = MOSQ_ERR_SUCCESS + else: + rc = 1 + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + self._callback_mutex.release() + return MOSQ_ERR_CONN_LOST + + return MOSQ_ERR_SUCCESS + + def message_retry_set(self, retry): + """Set the timeout in seconds before a message with QoS>0 is retried. + 20 seconds by default.""" + if retry < 0: + raise ValueError('Invalid retry.') + + self._message_retry = retry + + def user_data_set(self, userdata): + """Set the user data variable passed to callbacks. May be any data type.""" + self._userdata = userdata + + def will_set(self, topic, payload=None, qos=0, retain=False): + """Set a Will to be sent by the broker in case the client disconnects unexpectedly. + + This must be called before connect() to have any effect. + + topic: The topic that the will message should be published on. + payload: The message to send as a will. If not given, or set to None a + zero length message will be used as the will. Passing an int or float + will result in the payload being converted to a string representing + that number. If you wish to send a true int/float, use struct.pack() to + create the payload you require. + qos: The quality of service level to use for the will. + retain: If set to true, the will message will be set as the "last known + good"/retained message for the topic. + + Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has + zero string length. + """ + if topic == None or len(topic) == 0: + raise ValueError('Invalid topic.') + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if isinstance(payload, str) or isinstance(payload, bytearray): + self._will_payload = payload + elif isinstance(payload, int) or isinstance(payload, float): + self._will_payload = str(payload) + elif payload == None: + self._will_payload = None + else: + raise TypeError('payload must be a string, bytearray, int, float or None.') + + self._will = True + self._will_topic = topic + self._will_qos = qos + self._will_retain = retain + + def will_clear(self): + """ Removes a will that was previously configured with will_set(). + + Must be called before connect() to have any effect.""" + self._will = False + self._will_topic = "" + self._will_payload = None + self._will_qos = 0 + self._will_retain = False + + def socket(self): + """Return the socket or ssl object for this client.""" + if self._ssl: + return self._ssl + else: + return self._sock + + def loop_forever(self, timeout=1.0, max_packets=1): + """This function call loop() for you in an infinite blocking loop. It + is useful for the case where you only want to run the MQTT client loop + in your program. + + loop_forever() will handle reconnecting for you. If you call + disconnect() in a callback it will return.""" + + run = True + if self._state == mosq_cs_connect_async: + self.reconnect() + + while run: + rc = MOSQ_ERR_SUCCESS + while rc == MOSQ_ERR_SUCCESS: + rc = self.loop(timeout, max_packets) + + if self._state == mosq_cs_disconnecting: + run = False + else: + time.sleep(1) + self.reconnect() + return rc + + def loop_start(self): + """This is part of the threaded client interface. Call this once to + start a new thread to process network traffic. This provides an + alternative to repeatedly calling loop() yourself. + """ + if self._thread != None: + return MOSQ_ERR_INVAL + + self._thread = threading.Thread(target=self._thread_main) + self._thread.daemon = True + self._thread.start() + + def loop_stop(self, force=False): + """This is part of the threaded client interface. Call this once to + stop the network thread previously created with loop_start(). This call + will block until the network thread finishes. + + The force parameter is currently ignored. + """ + if self._thread == None: + return MOSQ_ERR_INVAL + + self._thread_terminate = True + self._thread.join() + self._thread = None + + # ============================================================ + # Private functions + # ============================================================ + + def _loop_rc_handle(self, rc): + if rc: + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + self._state_mutex.acquire() + if self._state == mosq_cs_disconnecting: + rc = MOSQ_ERR_SUCCESS + self._state_mutex.release() + self._callback_mutex.acquire() + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + + self._callback_mutex.release() + return rc + + def _packet_read(self): + # This gets called if pselect() indicates that there is network data + # available - ie. at least one byte. What we do depends on what data we + # already have. + # If we've not got a command, attempt to read one and save it. This should + # always work because it's only a single byte. + # Then try to read the remaining length. This may fail because it is may + # be more than one byte - will need to save data pending next read if it + # does fail. + # Then try to read the remaining payload, where 'payload' here means the + # combined variable header and actual payload. This is the most likely to + # fail due to longer length, so save current data and current position. + # After all data is read, send to _mosquitto_handle_packet() to deal with. + # Finally, free the memory and reset everything to starting conditions. + if self._in_packet.command == 0: + try: + if self._ssl: + command = self._ssl.read(1) + else: + command = self._sock.recv(1) + except socket.error as err: + (msg) = err + if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): + return MOSQ_ERR_AGAIN + if msg.errno == errno.EAGAIN: + return MOSQ_ERR_AGAIN + print(msg) + return 1 + else: + if len(command) == 0: + return 1 + command = struct.unpack("!B", command) + self._in_packet.command = command[0] + + if self._in_packet.have_remaining == 0: + # Read remaining + # Algorithm for decoding taken from pseudo code at + # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm + while True: + try: + if self._ssl: + byte = self._ssl.read(1) + else: + byte = self._sock.recv(1) + except socket.error as err: + (msg) = err + if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): + return MOSQ_ERR_AGAIN + if msg.errno == errno.EAGAIN: + return MOSQ_ERR_AGAIN + print(msg) + return 1 + else: + byte = struct.unpack("!B", byte) + byte = byte[0] + self._in_packet.remaining_count.append(byte) + # Max 4 bytes length for remaining length as defined by protocol. + # Anything more likely means a broken/malicious client. + if len(self._in_packet.remaining_count) > 4: + return MOSQ_ERR_PROTOCOL + + self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult + self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128 + + if (byte & 128) == 0: + break + + self._in_packet.have_remaining = 1 + self._in_packet.to_process = self._in_packet.remaining_length + + while self._in_packet.to_process > 0: + try: + if self._ssl: + data = self._ssl.read(self._in_packet.to_process) + else: + data = self._sock.recv(self._in_packet.to_process) + except socket.error as err: + (msg) = err + if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): + return MOSQ_ERR_AGAIN + if msg.errno == errno.EAGAIN: + return MOSQ_ERR_AGAIN + print(msg) + return 1 + else: + self._in_packet.to_process = self._in_packet.to_process - len(data) + self._in_packet.packet = self._in_packet.packet + data + + # All data for this packet is read. + self._in_packet.pos = 0 + rc = self._packet_handle() + + # Free data and reset values + self._in_packet.cleanup() + + self._msgtime_mutex.acquire() + self._last_msg_in = time.time() + self._msgtime_mutex.release() + return rc + + def _packet_write(self): + self._current_out_packet_mutex.acquire() + + while self._current_out_packet: + packet = self._current_out_packet + + try: + if self._ssl: + write_length = self._ssl.write(packet.packet[packet.pos:]) + else: + write_length = self._sock.send(packet.packet[packet.pos:]) + except AttributeError: + self._current_out_packet_mutex.release() + return MOSQ_ERR_SUCCESS + except socket.error as err: + self._current_out_packet_mutex.release() + (msg) = err + if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): + return MOSQ_ERR_AGAIN + if msg.errno == errno.EAGAIN: + return MOSQ_ERR_AGAIN + print(msg) + return 1 + + if write_length > 0: + packet.to_process = packet.to_process - write_length + packet.pos = packet.pos + write_length + + if packet.to_process == 0: + if (packet.command & 0xF0) == PUBLISH and packet.qos == 0: + self._callback_mutex.acquire() + if self.on_publish: + self._in_callback = True + self.on_publish(self, self._userdata, packet.mid) + self._in_callback = False + + self._callback_mutex.release() + + self._out_packet_mutex.acquire() + if len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + else: + self._current_out_packet = None + self._out_packet_mutex.release() + else: + pass # FIXME + + self._current_out_packet_mutex.release() + + self._msgtime_mutex.acquire() + self._last_msg_out = time.time() + self._msgtime_mutex.release() + + return MOSQ_ERR_SUCCESS + + def _easy_log(self, level, buf): + if self.on_log: + self.on_log(self, self._userdata, level, buf) + + def _check_keepalive(self): + now = time.time() + self._msgtime_mutex.acquire() + last_msg_out = self._last_msg_out + last_msg_in = self._last_msg_in + self._msgtime_mutex.release() + if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): + if self._state == mosq_cs_connected and self._ping_t == 0: + self._send_pingreq() + self._msgtime_mutex.acquire() + self._last_msg_out = now + self._last_msg_in = now + self._msgtime_mutex.release() + else: + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + if self._state == mosq_cs_disconnecting: + rc = MOSQ_ERR_SUCCESS + else: + rc = 1 + self._callback_mutex.acquire() + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + self._callback_mutex.release() + + def _mid_generate(self): + self._last_mid = self._last_mid + 1 + if self._last_mid == 65536: + self._last_mid = 1 + return self._last_mid + + def _topic_wildcard_len_check(self, topic): + # Search for + or # in a topic. Return MOSQ_ERR_INVAL if found. + # Also returns MOSQ_ERR_INVAL if the topic string is too long. + # Returns MOSQ_ERR_SUCCESS if everything is fine. + if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: + return MOSQ_ERR_INVAL + else: + return MOSQ_ERR_SUCCESS + + def _send_pingreq(self): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGREQ") + rc = self._send_simple_command(PINGREQ) + if rc == MOSQ_ERR_SUCCESS: + self._ping_t = time.time() + return rc + + def _send_pingresp(self): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGRESP") + return self._send_simple_command(PINGRESP) + + def _send_puback(self, mid): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBACK, mid, False) + + def _send_pubcomp(self, mid): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBCOMP, mid, False) + + def _pack_remaining_length(self, packet, remaining_length): + remaining_bytes = [] + while True: + byte = remaining_length % 128 + remaining_length = remaining_length // 128 + # If there are more digits to encode, set the top bit of this digit + if remaining_length > 0: + byte = byte | 0x80 + + remaining_bytes.append(byte) + packet.extend(struct.pack("!B", byte)) + if remaining_length == 0: + # FIXME - this doesn't deal with incorrectly large payloads + return packet + + def _pack_str16(self, packet, data): + if sys.version_info[0] < 3: + if isinstance(data, bytearray): + packet.extend(struct.pack("!H", len(data))) + packet.extend(data) + elif isinstance(data, str): + pack_format = "!H" + str(len(data)) + "s" + packet.extend(struct.pack(pack_format, len(data), data)) + elif isinstance(data, unicode): + udata = data.encode('utf-8') + pack_format = "!H" + str(len(udata)) + "s" + packet.extend(struct.pack(pack_format, len(udata), udata)) + else: + raise TypeError + else: + if isinstance(data, bytearray): + packet.extend(struct.pack("!H", len(data))) + packet.extend(data) + elif isinstance(data, str): + udata = data.encode('utf-8') + pack_format = "!H" + str(len(udata)) + "s" + packet.extend(struct.pack(pack_format, len(udata), udata)) + else: + raise TypeError + + def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): + if self._sock == None and self._ssl == None: + return MOSQ_ERR_NO_CONN + + command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain + packet = bytearray() + packet.extend(struct.pack("!B", command)) + if payload == None: + remaining_length = 2+len(topic) + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)") + else: + remaining_length = 2+len(topic) + len(payload) + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)") + + if qos > 0: + # For message id + remaining_length = remaining_length + 2 + + self._pack_remaining_length(packet, remaining_length) + self._pack_str16(packet, topic) + + if qos > 0: + # For message id + packet.extend(struct.pack("!H", mid)) + + if payload != None: + if isinstance(payload, str): + if sys.version_info[0] < 3: + pack_format = str(len(payload)) + "s" + packet.extend(struct.pack(pack_format, payload)) + else: + upayload = payload.encode('utf-8') + pack_format = str(len(upayload)) + "s" + packet.extend(struct.pack(pack_format, upayload)) + elif isinstance(payload, bytearray): + packet.extend(payload) + elif isinstance(payload, unicode): + upayload = payload.encode('utf-8') + pack_format = str(len(upayload)) + "s" + packet.extend(struct.pack(pack_format, upayload)) + else: + raise TypeError('payload must be a string, unicode or a bytearray.') + + return self._packet_queue(PUBLISH, packet, mid, qos) + + def _send_pubrec(self, mid): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBREC, mid, False) + + def _send_pubrel(self, mid, dup=False): + self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBREL|2, mid, dup) + + def _send_command_with_mid(self, command, mid, dup): + # For PUBACK, PUBCOMP, PUBREC, and PUBREL + if dup: + command = command | 8 + + remaining_length = 2 + packet = struct.pack('!BBH', command, remaining_length, mid) + return self._packet_queue(command, packet, mid, 1) + + def _send_simple_command(self, command): + # For DISCONNECT, PINGREQ and PINGRESP + remaining_length = 0 + packet = struct.pack('!BB', command, remaining_length) + return self._packet_queue(command, packet, 0, 0) + + def _send_connect(self, keepalive, clean_session): + remaining_length = 12 + 2+len(self._client_id) + connect_flags = 0 + if clean_session: + connect_flags = connect_flags | 0x02 + + if self._will: + remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload) + connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5) + + if self._username: + remaining_length = remaining_length + 2+len(self._username) + connect_flags = connect_flags | 0x80 + if self._password: + connect_flags = connect_flags | 0x40 + remaining_length = remaining_length + 2+len(self._password) + + command = CONNECT + packet = bytearray() + packet.extend(struct.pack("!B", command)) + self._pack_remaining_length(packet, remaining_length) + packet.extend(struct.pack("!H6sBBH", len(PROTOCOL_NAME), PROTOCOL_NAME, PROTOCOL_VERSION, connect_flags, keepalive)) + + self._pack_str16(packet, self._client_id) + + if self._will: + self._pack_str16(packet, self._will_topic) + if len(self._will_payload) > 0: + self._pack_str16(packet, self._will_payload) + else: + packet.extend(struct.pack("!H", 0)) + + if self._username: + self._pack_str16(packet, self._username) + + if self._password: + self._pack_str16(packet, self._password) + + self._keepalive = keepalive + return self._packet_queue(command, packet, 0, 0) + + def _send_disconnect(self): + return self._send_simple_command(DISCONNECT) + + def _send_subscribe(self, dup, topic, topic_qos): + remaining_length = 2 + 2+len(topic) + 1 + command = SUBSCRIBE | (dup<<3) | (1<<1) + packet = bytearray() + packet.extend(struct.pack("!B", command)) + self._pack_remaining_length(packet, remaining_length) + local_mid = self._mid_generate() + packet.extend(struct.pack("!H", local_mid)) + self._pack_str16(packet, topic) + packet.extend(struct.pack("B", topic_qos)) + return (self._packet_queue(command, packet, local_mid, 1), local_mid) + + def _send_unsubscribe(self, dup, topic): + remaining_length = 2 + 2+len(topic) + command = UNSUBSCRIBE | (dup<<3) | (1<<1) + packet = bytearray() + packet.extend(struct.pack("!B", command)) + self._pack_remaining_length(packet, remaining_length) + local_mid = self._mid_generate() + packet.extend(struct.pack("!H", local_mid)) + self._pack_str16(packet, topic) + return (self._packet_queue(command, packet, local_mid, 1), local_mid) + + def _message_update(self, mid, direction, state): + for m in self._messages: + if m.mid == mid and m.direction == direction: + m.state = state + m.timestamp = time.time() + return MOSQ_ERR_SUCCESS + + return MOSQ_ERR_NOT_FOUND + + def _message_retry_check(self): + now = time.time() + for m in self._messages: + if m.timestamp + self._message_retry < now: + if m.state == mosq_ms_wait_puback or m.state == mosq_ms_wait_pubrec: + m.timestamp = now + m.dup = True + self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + elif m.state == mosq_ms_wait_pubrel: + m.timestamp = now + m.dup = True + self._send_pubrec(m.mid) + elif m.state == mosq_ms_wait_pubcomp: + m.timestamp = now + m.dup = True + self._send_pubrel(m.mid, True) + + def _messages_reconnect_reset(self): + for m in self._messages: + m.timestamp = 0 + if m.direction == mosq_md_out: + if m.qos == 1: + m.state = mosq_ms_wait_puback + elif m.qos == 2: + m.state = mosq_ms_wait_pubrec + else: + self._messages.pop(self._messages.index(m)) + + def _packet_queue(self, command, packet, mid, qos): + mpkt = MosquittoPacket(command, packet, mid, qos) + + self._out_packet_mutex.acquire() + self._out_packet.append(mpkt) + if self._current_out_packet_mutex.acquire(False): + if self._current_out_packet == None and len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + self._current_out_packet_mutex.release() + self._out_packet_mutex.release() + + if not self._in_callback and self._thread == None: + return self.loop_write() + else: + return MOSQ_ERR_SUCCESS + + def _packet_handle(self): + cmd = self._in_packet.command&0xF0 + if cmd == PINGREQ: + return self._handle_pingreq() + elif cmd == PINGRESP: + return self._handle_pingresp() + elif cmd == PUBACK: + return self._handle_pubackcomp("PUBACK") + elif cmd == PUBCOMP: + return self._handle_pubackcomp("PUBCOMP") + elif cmd == PUBLISH: + return self._handle_publish() + elif cmd == PUBREC: + return self._handle_pubrec() + elif cmd == PUBREL: + return self._handle_pubrel() + elif cmd == CONNACK: + return self._handle_connack() + elif cmd == SUBACK: + return self._handle_suback() + elif cmd == UNSUBACK: + return self._handle_unsuback() + else: + # If we don't recognise the command, return an error straight away. + self._easy_log(MOSQ_LOG_ERR, "Error: Unrecognised command "+str(cmd)) + return MOSQ_ERR_PROTOCOL + + def _handle_pingreq(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 0: + return MOSQ_ERR_PROTOCOL + + self._easy_log(MOSQ_LOG_DEBUG, "Received PINGREQ") + return self._send_pingresp() + + def _handle_pingresp(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 0: + return MOSQ_ERR_PROTOCOL + + # No longer waiting for a PINGRESP. + self._ping_t = 0 + self._easy_log(MOSQ_LOG_DEBUG, "Received PINGRESP") + return MOSQ_ERR_SUCCESS + + def _handle_connack(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 2: + return MOSQ_ERR_PROTOCOL + + if len(self._in_packet.packet) != 2: + return MOSQ_ERR_PROTOCOL + + (resvd, result) = struct.unpack("!BB", self._in_packet.packet) + self._easy_log(MOSQ_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")") + self._callback_mutex.acquire() + if self.on_connect: + self._in_callback = True + self.on_connect(self, self._userdata, result) + self._in_callback = False + self._callback_mutex.release() + if result == 0: + self._state = mosq_cs_connected + return MOSQ_ERR_SUCCESS + elif result > 0 and result < 6: + return MOSQ_ERR_CONN_REFUSED + else: + return MOSQ_ERR_PROTOCOL + + def _handle_suback(self): + self._easy_log(MOSQ_LOG_DEBUG, "Received SUBACK") + pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's' + (mid, packet) = struct.unpack(pack_format, self._in_packet.packet) + pack_format = "!" + "B"*len(packet) + granted_qos = struct.unpack(pack_format, packet) + + self._callback_mutex.acquire() + if self.on_subscribe: + self._in_callback = True + self.on_subscribe(self, self._userdata, mid, granted_qos) + self._in_callback = False + self._callback_mutex.release() + + return MOSQ_ERR_SUCCESS + + def _handle_publish(self): + rc = 0 + + header = self._in_packet.command + message = MosquittoMessage() + message.direction = mosq_md_in + message.dup = (header & 0x08)>>3 + message.qos = (header & 0x06)>>1 + message.retain = (header & 0x01) + + pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's' + (slen, packet) = struct.unpack(pack_format, self._in_packet.packet) + pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's' + (message.topic, packet) = struct.unpack(pack_format, packet) + + if len(message.topic) == 0: + return MOSQ_ERR_PROTOCOL + + if sys.version_info[0] >= 3: + message.topic = message.topic.decode('utf-8') + message.topic = _fix_sub_topic(message.topic) + + if message.qos > 0: + pack_format = "!H" + str(len(packet)-2) + 's' + (message.mid, packet) = struct.unpack(pack_format, packet) + + message.payload = packet + + self._easy_log(MOSQ_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+ + ", q"+str(message.qos)+", r"+str(message.retain)+ + ", m"+str(message.mid)+", '"+message.topic+ + "', ... ("+str(len(message.payload))+" bytes)") + + message.timestamp = time.time() + if message.qos == 0: + self._callback_mutex.acquire() + if self.on_message: + self._in_callback = True + self.on_message(self, self._userdata, message) + self._in_callback = False + + self._callback_mutex.release() + return MOSQ_ERR_SUCCESS + elif message.qos == 1: + rc = self._send_puback(message.mid) + self._callback_mutex.acquire() + if self.on_message: + self._in_callback = True + self.on_message(self, self._userdata, message) + self._in_callback = False + + self._callback_mutex.release() + return rc + elif message.qos == 2: + rc = self._send_pubrec(message.mid) + message.state = mosq_ms_wait_pubrel + self._messages.append(message) + return rc + else: + return MOSQ_ERR_PROTOCOL + + def _handle_pubrel(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 2: + return MOSQ_ERR_PROTOCOL + + if len(self._in_packet.packet) != 2: + return MOSQ_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet.packet) + mid = mid[0] + self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") + + for i in range(len(self._messages)): + if self._messages[i].direction == mosq_md_in and self._messages[i].mid == mid: + + # Only pass the message on if we have removed it from the queue - this + # prevents multiple callbacks for the same message. + self._callback_mutex.acquire() + if self.on_message: + self._in_callback = True + self.on_message(self, self._userdata, self._messages[i]) + self._in_callback = False + self._callback_mutex.release() + self._messages.pop(i) + + return self._send_pubcomp(mid) + + return MOSQ_ERR_SUCCESS + + def _handle_pubrec(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 2: + return MOSQ_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet.packet) + mid = mid[0] + self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") + + for i in range(len(self._messages)): + if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: + self._messages[i].state = mosq_ms_wait_pubcomp + self._messages[i].timestamp = time.time() + return self._send_pubrel(mid, False) + + return MOSQ_ERR_SUCCESS + + def _handle_unsuback(self): + if self._strict_protocol: + if self._in_packet.remaining_length != 2: + return MOSQ_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet.packet) + mid = mid[0] + self._easy_log(MOSQ_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") + self._callback_mutex.acquire() + if self.on_unsubscribe: + self._in_callback = True + self.on_unsubscribe(self, self._userdata, mid) + self._in_callback = False + self._callback_mutex.release() + return MOSQ_ERR_SUCCESS + + def _handle_pubackcomp(self, cmd): + if self._strict_protocol: + if self._in_packet.remaining_length != 2: + return MOSQ_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet.packet) + mid = mid[0] + self._easy_log(MOSQ_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") + + for i in range(len(self._messages)): + try: + if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: + # Only inform the client the message has been sent once. + self._callback_mutex.acquire() + if self.on_publish: + self._in_callback = True + self.on_publish(self, self._userdata, mid) + self._in_callback = False + + self._callback_mutex.release() + self._messages.pop(i) + except IndexError: + # Have removed item so i>count. + # Not really an error. + pass + + return MOSQ_ERR_SUCCESS + + def _thread_main(self): + run = True + self._thread_terminate = False + self._state_mutex.acquire() + if self._state == mosq_cs_connect_async: + self._state_mutex.release() + self.reconnect() + else: + self._state_mutex.release() + + while run: + rc = MOSQ_ERR_SUCCESS + while rc == MOSQ_ERR_SUCCESS: + rc = self.loop() + if self._thread_terminate: + rc = 1 + run = False + + self._state_mutex.acquire() + if self._state == mosq_cs_disconnecting: + run = False + self._state_mutex.release() + else: + self._state_mutex.release() + time.sleep(1) + self.reconnect() +