]> git.michaelhowe.org Git - packages/p/paho-mqtt.git/commitdiff
Start of reorganisation for Paho.
authorRoger Light <roger@atchoo.org>
Sat, 15 Jun 2013 21:20:20 +0000 (22:20 +0100)
committerRoger Light <roger@atchoo.org>
Mon, 3 Feb 2014 21:15:27 +0000 (21:15 +0000)
--HG--
rename : lib/python/sub.py => examples/sub.py
rename : lib/python/setup.py => setup.py
rename : lib/python/mosquitto.py => src/paho/mqtt/mosquitto.py

examples/sub.py [new file with mode: 0755]
lib/python/mosquitto.py [deleted file]
lib/python/setup.py [deleted file]
lib/python/sub.py [deleted file]
setup.py [new file with mode: 0644]
src/paho/mqtt/mosquitto.py [new file with mode: 0755]

diff --git a/examples/sub.py b/examples/sub.py
new file mode 100755 (executable)
index 0000000..7af946d
--- /dev/null
@@ -0,0 +1,62 @@
+#!/usr/bin/python
+
+# Copyright (c) 2010,2011 Roger Light <roger@atchoo.org>
+# All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+# 
+# 1. Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#   notice, this list of conditions and the following disclaimer in the
+#   documentation and/or other materials provided with the distribution.
+# 3. Neither the name of mosquitto nor the names of its
+#   contributors may be used to endorse or promote products derived from
+#   this software without specific prior written permission.
+# 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import mosquitto
+
+def on_connect(mosq, obj, rc):
+    print("rc: "+str(rc))
+
+def on_message(mosq, obj, msg):
+    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+def on_publish(mosq, obj, mid):
+    print("mid: "+str(mid))
+
+def on_subscribe(mosq, obj, mid, granted_qos):
+    print("Subscribed: "+str(mid)+" "+str(granted_qos))
+
+def on_log(mosq, obj, level, string):
+    print(string)
+
+mqttc = mosquitto.Mosquitto()
+mqttc.on_message = on_message
+mqttc.on_connect = on_connect
+mqttc.on_publish = on_publish
+mqttc.on_subscribe = on_subscribe
+# Uncomment to enable debug messages
+#mqttc.on_log = on_log
+mqttc.connect("test.mosquitto.org", 1883, 60)
+mqttc.subscribe("$SYS/#", 0)
+
+
+rc = 0
+while rc == 0:
+    rc = mqttc.loop()
+
+print("rc: "+str(rc))
diff --git a/lib/python/mosquitto.py b/lib/python/mosquitto.py
deleted file mode 100755 (executable)
index 86f6ce9..0000000
+++ /dev/null
@@ -1,1781 +0,0 @@
-# Copyright (c) 2012 Roger Light <roger@atchoo.org>
-# All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-# 
-# 1. Redistributions of source code must retain the above copyright notice,
-#    this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions and the following disclaimer in the
-#    documentation and/or other materials provided with the distribution.
-# 3. Neither the name of mosquitto nor the names of its
-#    contributors may be used to endorse or promote products derived from
-#    this software without specific prior written permission.
-# 
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#
-#
-# This product includes software developed by the OpenSSL Project for use in
-# the OpenSSL Toolkit. (http://www.openssl.org/)
-# This product includes cryptographic software written by Eric Young
-# (eay@cryptsoft.com)
-# This product includes software written by Tim Hudson (tjh@cryptsoft.com)
-
-"""
-This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
-protocol that is easy to implement and suitable for low powered devices.
-"""
-import errno
-import random
-import select
-import socket
-import ssl
-import struct
-import sys
-import threading
-import time
-
-if sys.version_info[0] < 3:
-    PROTOCOL_NAME = "MQIsdp"
-else:
-    PROTOCOL_NAME = b"MQIsdp"
-
-PROTOCOL_VERSION = 3
-
-# Message types 
-CONNECT = 0x10
-CONNACK = 0x20
-PUBLISH = 0x30
-PUBACK = 0x40
-PUBREC = 0x50
-PUBREL = 0x60
-PUBCOMP = 0x70
-SUBSCRIBE = 0x80
-SUBACK = 0x90
-UNSUBSCRIBE = 0xA0
-UNSUBACK = 0xB0
-PINGREQ = 0xC0
-PINGRESP = 0xD0
-DISCONNECT = 0xE0
-
-# Log levels
-MOSQ_LOG_INFO = 0x01
-MOSQ_LOG_NOTICE = 0x02
-MOSQ_LOG_WARNING = 0x04
-MOSQ_LOG_ERR = 0x08
-MOSQ_LOG_DEBUG = 0x10
-
-# CONNACK codes
-CONNACK_ACCEPTED = 0
-CONNACK_REFUSED_PROTOCOL_VERSION = 1
-CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
-CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
-CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
-CONNACK_REFUSED_NOT_AUTHORIZED = 5
-
-# Connection state
-mosq_cs_new = 0
-mosq_cs_connected = 1
-mosq_cs_disconnecting = 2
-mosq_cs_connect_async = 3
-
-# Message direction
-mosq_md_invalid = 0
-mosq_md_in = 1
-mosq_md_out = 2
-
-# Message state
-mosq_ms_invalid = 0,
-mosq_ms_wait_puback = 1
-mosq_ms_wait_pubrec = 2
-mosq_ms_wait_pubrel = 3
-mosq_ms_wait_pubcomp = 4
-
-# Error values
-MOSQ_ERR_AGAIN = -1
-MOSQ_ERR_SUCCESS = 0
-MOSQ_ERR_NOMEM = 1
-MOSQ_ERR_PROTOCOL = 2
-MOSQ_ERR_INVAL = 3
-MOSQ_ERR_NO_CONN = 4
-MOSQ_ERR_CONN_REFUSED = 5
-MOSQ_ERR_NOT_FOUND = 6
-MOSQ_ERR_CONN_LOST = 7
-MOSQ_ERR_TLS = 8
-MOSQ_ERR_PAYLOAD_SIZE = 9
-MOSQ_ERR_NOT_SUPPORTED = 10
-MOSQ_ERR_AUTH = 11
-MOSQ_ERR_ACL_DENIED = 12
-MOSQ_ERR_UNKNOWN = 13
-MOSQ_ERR_ERRNO = 14
-
-def _fix_sub_topic(subtopic):
-    # Convert ////some////over/slashed///topic/etc/etc//
-    # into some/over/slashed/topic/etc/etc
-    if subtopic[0] == '/':
-        return '/'+'/'.join(filter(None, subtopic.split('/')))
-    else:
-        return '/'.join(filter(None, subtopic.split('/')))
-
-def error_string(mosq_errno):
-    """Return the error string associated with a mosquitto error number."""
-    if mosq_errno == MOSQ_ERR_SUCCESS:
-        return "No error."
-    elif mosq_errno == MOSQ_ERR_NOMEM:
-        return "Out of memory."
-    elif mosq_errno == MOSQ_ERR_PROTOCOL:
-        return "A network protocol error occurred when communicating with the broker."
-    elif mosq_errno == MOSQ_ERR_INVAL:
-        return "Invalid function arguments provided."
-    elif mosq_errno == MOSQ_ERR_NO_CONN:
-        return "The client is not currently connected."
-    elif mosq_errno == MOSQ_ERR_CONN_REFUSED:
-        return "The connection was refused."
-    elif mosq_errno == MOSQ_ERR_NOT_FOUND:
-        return "Message not found (internal error)."
-    elif mosq_errno == MOSQ_ERR_CONN_LOST:
-        return "The connection was lost."
-    elif mosq_errno == MOSQ_ERR_TLS:
-        return "A TLS error occurred."
-    elif mosq_errno == MOSQ_ERR_PAYLOAD_SIZE:
-        return "Payload too large."
-    elif mosq_errno == MOSQ_ERR_NOT_SUPPORTED:
-        return "This feature is not supported."
-    elif mosq_errno == MOSQ_ERR_AUTH:
-        return "Authorisation failed."
-    elif mosq_errno == MOSQ_ERR_ACL_DENIED:
-        return "Access denied by ACL."
-    elif mosq_errno == MOSQ_ERR_UNKNOWN:
-        return "Unknown error."
-    elif mosq_errno == MOSQ_ERR_ERRNO:
-        return "Error defined by errno."
-    else:
-        return "Unknown error."
-
-def connack_string(connack_code):
-    """Return the string associated with a CONNACK result."""
-    if connack_code == 0:
-        return "Connection Accepted."
-    elif connack_code == 1:
-        return "Connection Refused: unacceptable protocol version."
-    elif connack_code == 2:
-        return "Connection Refused: identifier rejected."
-    elif connack_code == 3:
-        return "Connection Refused: broker unavailable."
-    elif connack_code == 4:
-        return "Connection Refused: bad user name or password."
-    elif connack_code == 5:
-        return "Connection Refused: not authorised."
-    else:
-        return "Connection Refused: unknown reason."
-
-def topic_matches_sub(sub, topic):
-    """Check whether a topic matches a subscription.
-    
-    For example:
-    
-    foo/bar would match the subscription foo/# or +/bar
-    non/matching would not match the subscription non/+/+
-    """
-    result = True
-    local_sub = _fix_sub_topic(sub)
-    local_topic = _fix_sub_topic(topic)
-    multilevel_wildcard = False
-
-    slen = len(local_sub)
-    tlen = len(local_topic)
-
-    spos = 0;
-    tpos = 0;
-
-    while spos < slen and tpos < tlen:
-        if local_sub[spos] == local_topic[tpos]:
-            spos += 1
-            tpos += 1
-        else:
-            if local_sub[spos] == '+':
-                spos += 1
-                while tpos < tlen and local_topic[tpos] != '/':
-                    tpos += 1
-                if tpos == tlen and spos == slen:
-                    result = True
-                    break
-
-            elif local_sub[spos] == '#':
-                multilevel_wildcard = True
-                if spos+1 != slen:
-                    result = False
-                    break
-                else:
-                    result = True
-                    break
-
-            else:
-                result = False
-                break
-
-        if tpos == tlen-1:
-            # Check for e.g. foo matching foo/#
-            if spos == slen-3 and local_sub[spos+1] == '/' and local_sub[spos+2] == '#':
-                result = True
-                multilevel_wildcard = True
-                break
-
-    if not multilevel_wildcard and (tpos < tlen or spos < slen):
-        result = False
-
-    return result
-
-
-class MosquittoMessage:
-    """ This is a class that describes an incoming message. It is passed to the
-    on_message callback as the message parameter.
-    
-    Members:
-
-    topic : String. topic that the message was published on.
-    payload : String/bytes the message payload.
-    qos : Integer. The message Quality of Service 0, 1 or 2.
-    retain : Boolean. If true, the message is a retained message and not fresh.
-    mid : Integer. The message id.
-    """
-    def __init__(self):
-        self.timestamp = 0
-        self.direction = mosq_md_invalid
-        self.state = mosq_ms_invalid
-        self.dup = False
-        self.mid = 0
-        self.topic = ""
-        self.payload = None
-        self.qos = 0
-        self.retain = False
-
-class MosquittoInPacket:
-    """Internal datatype."""
-    def __init__(self):
-        self.command = 0
-        self.have_remaining = 0
-        self.remaining_count = []
-        self.remaining_mult = 1
-        self.remaining_length = 0
-        self.packet = b""
-        self.to_process = 0
-        self.pos = 0
-
-    def cleanup(self):
-        self.__init__()
-
-class MosquittoPacket:
-    """Internal datatype."""
-    def __init__(self, command, packet, mid, qos):
-        self.command = command
-        self.mid = mid
-        self.qos = qos
-        self.pos = 0
-        self.to_process = len(packet)
-        self.packet = packet
-
-class Mosquitto:
-    """MQTT version 3.1 client class.
-    
-    This is the main class for use communicating with an MQTT broker.
-
-    General usage flow:
-
-    * Use connect()/connect_async() to connect to a broker
-    * Call loop() frequently to maintain network traffic flow with the broker
-    * Or use loop_start() to set a thread running to call loop() for you.
-    * Or use loop_forever() to handle calling loop() for you in a blocking
-    * function.
-    * Use subscribe() to subscribe to a topic and receive messages
-    * Use publish() to send messages
-    * Use disconnect() to disconnect from the broker
-
-    Data returned from the broker is made available with the use of callback
-    functions as described below.
-
-    Callbacks
-    =========
-
-    A number of callback functions are available to receive data back from the
-    broker. To use a callback, define a function and then assign it to the
-    client:
-    
-    def on_connect(mosq, userdata, rc):
-        print("Connection returned " + str(rc))
-
-    client.on_connect = on_connect
-
-    All of the callbacks as described below have a "mosq" and an "userdata"
-    argument. "mosq" is the Mosquitto instance that is calling the callback.
-    "userdata" is user data of any type and can be set when creating a new client
-    instance or with user_data_set(userdata).
-    
-    The callbacks:
-
-    on_connect(mosq, userdata, rc): called when the broker responds to our connection
-      request. The value of rc determines success or not:
-      0: Connection successful
-      1: Connection refused - incorrect protocol version
-      2: Connection refused - invalid client identifier
-      3: Connection refused - server unavailable
-      4: Connection refused - bad username or password
-      5: Connection refused - not authorised
-      6-255: Currently unused.
-
-    on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker.
-      The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS
-      (0), the callback was called in response to a disconnect() call. If any
-      other value the disconnection was unexpected, such as might be caused by
-      a network error.
-
-    on_message(mosq, userdata, message): called when a message has been received on a
-      topic that the client subscribes to. The message variable is a
-      MosquittoMessage that describes all of the message parameters.
-
-    on_publish(mosq, userdata, mid): called when a message that was to be sent using the
-      publish() call has completed transmission to the broker. For messages
-      with QoS levels 1 and 2, this means that the appropriate handshakes have
-      completed. For QoS 0, this simply means that the message has left the
-      client. The mid variable matches the mid variable returned from the
-      corresponding publish() call, to allow outgoing messages to be tracked.
-      This callback is important because even if the publish() call returns
-      success, it does not always mean that the message has been sent.
-
-    on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a
-      subscribe request. The mid variable matches the mid variable returned
-      from the corresponding subscribe() call. The granted_qos variable is a
-      list of integers that give the QoS level the broker has granted for each
-      of the different subscription requests.
-
-    on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe
-      request. The mid variable matches the mid variable returned from the
-      corresponding unsubscribe() call.
-
-    on_log(mosq, userdata, level, buf): called when the client has log information. Define
-      to allow debugging. The level variable gives the severity of the message
-      and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING,
-      MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf.
-
-    """
-    def __init__(self, client_id="", clean_session=True, userdata=None):
-        """client_id is the unique client id string used when connecting to the
-        broker. If client_id is zero length or None, then one will be randomly
-        generated. In this case, clean_session must be True. If this is not the
-        case a ValueError will be raised.
-
-        clean_session is a boolean that determines the client type. If True,
-        the broker will remove all information about this client when it
-        disconnects. If False, the client is a persistent client and
-        subscription information and queued messages will be retained when the
-        client disconnects. 
-        Note that a client will never discard its own outgoing messages on
-        disconnect. Calling connect() or reconnect() will cause the messages to
-        be resent.  Use reinitialise() to reset a client to its original state.
-
-        userdata is user defined data of any type that is passed as the "userdata"
-        parameter to callbacks. It may be updated at a later point with the
-        user_data_set() function.
-        """
-        if not clean_session and (client_id == "" or client_id == None):
-            raise ValueError('A client id must be provided if clean session is False.')
-
-        self._userdata = userdata
-        self._sock = None
-        self._keepalive = 60
-        self._message_retry = 20
-        self._last_retry_check = 0
-        self._clean_session = clean_session
-        if client_id == "":
-            self._client_id = "mosq/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5))
-        else:
-            self._client_id = client_id
-
-        self._username = ""
-        self._password = ""
-        self._in_packet = MosquittoInPacket()
-        self._out_packet = []
-        self._current_out_packet = None
-        self._last_msg_in = time.time()
-        self._last_msg_out = time.time()
-        self._ping_t = 0
-        self._last_mid = 0
-        self._state = mosq_cs_new
-        self._messages = []
-        self._will = False
-        self._will_topic = ""
-        self._will_payload = None
-        self._will_qos = 0
-        self._will_retain = False
-        self.on_disconnect = None
-        self.on_connect = None
-        self.on_publish = None
-        self.on_message = None
-        self.on_subscribe = None
-        self.on_unsubscribe = None
-        self.on_log = None
-        self._host = ""
-        self._port = 1883
-        self._in_callback = False
-        self._strict_protocol = False
-        self._callback_mutex = threading.Lock()
-        self._state_mutex = threading.Lock()
-        self._out_packet_mutex = threading.Lock()
-        self._current_out_packet_mutex = threading.Lock()
-        self._msgtime_mutex = threading.Lock()
-        self._thread = None
-        self._thread_terminate = False
-        self._ssl = None
-        self._tls_certfile = None
-        self._tls_keyfile = None
-        self._tls_ca_certs = None
-        self._tls_cert_reqs = None
-        self._tls_ciphers = None
-
-    def __del__(self):
-        pass
-
-    def reinitialise(self, client_id="", clean_session=True, userdata=None):
-        if self._ssl:
-            self._ssl.close()
-            self._ssl = None
-            self._sock = None
-        elif self._sock:
-            self._sock.close()
-            self._sock = None
-        self.__init__(client_id, clean_session, userdata)
-
-    def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None):
-        """Configure network encryption and authentication options. Enables SSL/TLS support.
-
-        ca_certs : a string path to the Certificate Authority certificate files
-        that are to be treated as trusted by this client. If this is the only
-        option given then the client will operate in a similar manner to a web
-        browser. That is to say it will require the broker to have a
-        certificate signed by the Certificate Authorities in ca_certs and will
-        communicate using TLS v1, but will not attempt any form of
-        authentication. This provides basic network encryption but may not be
-        sufficient depending on how the broker is configured.
-
-        certfile and keyfile are strings pointing to the PEM encoded client
-        certificate and private keys respectively. If these arguments are not
-        None then they will be used as client information for TLS based
-        authentication.  Support for this feature is broker dependent. Note
-        that if either of these files in encrypted and needs a password to
-        decrypt it, Python will ask for the password at the command line. It is
-        not currently possible to define a callback to provide the password.
-        
-        cert_reqs allows the certificate requirements that the client imposes
-        on the broker to be changed. By default this is ssl.CERT_REQUIRED,
-        which means that the broker must provide a certificate. See the ssl
-        pydoc for more information on this parameter.
-        
-        tls_version allows the version of the SSL/TLS protocol used to be
-        specified. By default TLS v1 is used. Previous versions (all versions
-        beginning with SSL) are possible but not recommended due to possible
-        security problems.
-
-        ciphers is a string specifying which encryption ciphers are allowable
-        for this connection, or None to use the defaults. See the ssl pydoc for
-        more information.
-
-        Must be called before connect() or connect_async()."""
-        if sys.version < '2.7':
-            raise ValueError('Python 2.7 is the minimum supported version for TLS.')
-
-        if ca_certs == None:
-            raise ValueError('ca_certs must not be None.')
-
-        try:
-            f = open(ca_certs, "r")
-        except IOError as err:
-            raise IOError(ca_certs+": "+err.strerror)
-        else:
-            f.close()
-        if certfile != None:
-            try:
-                f = open(certfile, "r")
-            except IOError as err:
-                raise IOError(certfile+": "+err.strerror)
-            else:
-                f.close()
-        if keyfile != None:
-            try:
-                f = open(keyfile, "r")
-            except IOError as err:
-                raise IOError(keyfile+": "+err.strerror)
-            else:
-                f.close()
-
-        self._tls_ca_certs = ca_certs
-        self._tls_certfile = certfile
-        self._tls_keyfile = keyfile
-        self._tls_cert_reqs = cert_reqs
-        self._tls_version = tls_version
-        self._tls_ciphers = ciphers
-
-    def connect(self, host, port=1883, keepalive=60):
-        """Connect to a remote broker.
-
-        host is the hostname or IP address of the remote broker.
-        port is the network port of the server host to connect to. Defaults to
-        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
-        are using tls_set() the port may need providing.
-        keepalive: Maximum period in seconds between communications with the
-        broker. If no other messages are being exchanged, this controls the
-        rate at which the client will send ping messages to the broker.
-        """
-        self.connect_async(host, port, keepalive)
-        return self.reconnect()
-
-    def connect_async(self, host, port=1883, keepalive=60):
-        """Connect to a remote broker asynchronously. This is a non-blocking
-        connect call that can be used with loop_start() to provide very quick
-        start.
-
-        host is the hostname or IP address of the remote broker.
-        port is the network port of the server host to connect to. Defaults to
-        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
-        are using tls_set() the port may need providing.
-        keepalive: Maximum period in seconds between communications with the
-        broker. If no other messages are being exchanged, this controls the
-        rate at which the client will send ping messages to the broker.
-        """
-        if host == None or len(host) == 0:
-            raise ValueError('Invalid host.')
-        if port <= 0:
-            raise ValueError('Invalid port number.')
-        if keepalive < 0:
-            raise ValueError('Keepalive must be >=0.')
-
-        self._host = host
-        self._port = port
-        self._keepalive = keepalive
-
-        self._state_mutex.acquire()
-        self._state = mosq_cs_connect_async
-        self._state_mutex.release()
-
-    def reconnect(self):
-        """Reconnect the client after a disconnect. Can only be called after
-        connect()/connect_async()."""
-        if len(self._host) == 0:
-            raise ValueError('Invalid host.')
-        if self._port <= 0:
-            raise ValueError('Invalid port number.')
-
-        self._in_packet.cleanup()
-        self._out_packet_mutex.acquire()
-        self._out_packet = []
-        self._out_packet_mutex.release()
-
-        self._current_out_packet_mutex.acquire()
-        self._current_out_packet = None
-        self._current_out_packet_mutex.release()
-
-        self._msgtime_mutex.acquire()
-        self._last_msg_in = time.time()
-        self._last_msg_out = time.time()
-        self._msgtime_mutex.release()
-
-        self._ping_t = 0
-        self._state_mutex.acquire()
-        self._state = mosq_cs_new
-        self._state_mutex.release()
-        if self._ssl:
-            self._ssl.close()
-            self._ssl = None
-            self._sock = None
-        elif self._sock:
-            self._sock.close()
-            self._sock = None
-
-        # Put messages in progress in a valid state.
-        self._messages_reconnect_reset()
-
-        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        # FIXME use create_connection here
-
-
-        if self._tls_ca_certs != None:
-            self._ssl = ssl.wrap_socket(self._sock,
-                    certfile=self._tls_certfile,
-                    keyfile=self._tls_keyfile,
-                    ca_certs=self._tls_ca_certs,
-                    cert_reqs=self._tls_cert_reqs,
-                    ssl_version=self._tls_version,
-                    ciphers=self._tls_ciphers)
-
-        try:
-            self.socket().connect((self._host, self._port))
-        except socket.error as err:
-            (msg) = err
-            if msg.errno != errno.EINPROGRESS:
-                print(msg)
-                return 1
-
-        self._sock.setblocking(0)
-
-        return self._send_connect(self._keepalive, self._clean_session)
-
-    def loop(self, timeout=1.0, max_packets=1):
-        """Process network events.
-
-        This function must be called regularly to ensure communication with the
-        broker is carried out. It calls select() on the network socket to wait
-        for network events. If incoming data is present it will then be
-        processed. Outgoing commands, from e.g. publish(), are normally sent
-        immediately that their function is called, but this is not always
-        possible. loop() will also attempt to send any remaining outgoing
-        messages, which also includes commands that are part of the flow for
-        messages with QoS>0.
-
-        timeout: The time in seconds to wait for incoming/outgoing network
-          traffic before timing out and returning. 
-        max_packets: Not currently used.
-
-        Returns MOSQ_ERR_SUCCESS on success.
-        Returns >0 on error.
-
-        A ValueError will be raised if timeout < 0"""
-        if timeout < 0.0:
-            raise ValueError('Invalid timeout.')
-
-        self._current_out_packet_mutex.acquire()
-        self._out_packet_mutex.acquire()
-        if self._current_out_packet == None and len(self._out_packet) > 0:
-            self._current_out_packet = self._out_packet.pop(0)
-
-        if self._current_out_packet:
-            wlist = [self.socket()]
-        else:
-            wlist = []
-        self._out_packet_mutex.release()
-        self._current_out_packet_mutex.release()
-
-        rlist = [self.socket()]
-        try:
-            socklist = select.select(rlist, wlist, [], timeout)
-        except TypeError:
-            # Socket isn't correct type, in likelihood connection is lost
-            return MOSQ_ERR_CONN_LOST
-
-        if self.socket() in socklist[0]:
-            rc = self.loop_read(max_packets)
-            if rc or (self._ssl == None and self._sock == None):
-                return rc
-
-        if self.socket() in socklist[1]:
-            rc = self.loop_write(max_packets)
-            if rc or (self._ssl == None and self._sock == None):
-                return rc
-
-        return self.loop_misc()
-
-    def publish(self, topic, payload=None, qos=0, retain=False):
-        """Publish a message on a topic.
-
-        This causes a message to be sent to the broker and subsequently from
-        the broker to any clients subscribing to matching topics.
-
-        topic: The topic that the message should be published on.
-        payload: The actual message to send. If not given, or set to None a
-        zero length message will be used. Passing an int or float will result
-        in the payload being converted to a string representing that number. If
-        you wish to send a true int/float, use struct.pack() to create the
-        payload you require.
-        qos: The quality of service level to use.
-        retain: If set to true, the message will be set as the "last known
-        good"/retained message for the topic.
-
-        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS to
-        indicate success or MOSQ_ERR_NO_CONN if the client is not currently
-        connected.  mid is the message ID for the publish request. The mid
-        value can be used to track the publish request by checking against the
-        mid argument in the on_publish() callback if it is defined.
-
-        A ValueError will be raised if topic == None, has zero length or is
-        invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
-        the length of the payload is greater than 268435455 bytes."""
-        if topic == None or len(topic) == 0:
-            raise ValueError('Invalid topic.')
-        if qos<0 or qos>2:
-            raise ValueError('Invalid QoS level.')
-        if isinstance(payload, str) or isinstance(payload, bytearray):
-            local_payload = payload
-        elif isinstance(payload, int) or isinstance(payload, float):
-            local_payload = str(payload)
-        elif payload == None:
-            local_payload = None
-        else:
-            raise TypeError('payload must be a string, bytearray, int, float or None.')
-
-        if local_payload != None and len(local_payload) > 268435455:
-            raise ValueError('Payload too large.')
-
-        if self._topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS:
-            raise ValueError('Publish topic cannot contain wildcards.')
-
-        local_mid = self._mid_generate()
-
-        if qos == 0:
-            rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False)
-            return (rc, local_mid)
-        else:
-            message = MosquittoMessage()
-            message.timestamp = time.time()
-            message.direction = mosq_md_out
-            if qos == 1:
-                message.state = mosq_ms_wait_puback
-            elif qos == 2:
-                message.state = mosq_ms_wait_pubrec
-
-            message.mid = local_mid
-            message.topic = topic
-            if local_payload == None or len(local_payload) == 0:
-                message.payload = None
-            else:
-                message.payload = local_payload
-
-            message.qos = qos
-            message.retain = retain
-            message.dup = False
-
-            self._messages.append(message)
-            rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
-            return (rc, local_mid)
-
-    def username_pw_set(self, username, password=None):
-        """Set a username and optionally a password for broker authentication.
-
-        Must be called before connect() to have any effect.
-        Requires a broker that supports MQTT v3.1.
-
-        username: The username to authenticate with. Need have no relationship to the client id.
-        password: The password to authenticate with. Optional, set to None if not required.
-        """
-        self._username = username
-        self._password = password
-
-    def disconnect(self):
-        """Disconnect a connected client from the broker."""
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        self._state_mutex.acquire()
-        self._state = mosq_cs_disconnecting
-        self._state_mutex.release()
-
-        return self._send_disconnect()
-
-    def subscribe(self, topic, qos=0):
-        """Subscribe the client to a topic.
-
-        sub: The subscription topic to subscribe to.
-        qos: The desired quality of service level for the subscription.
-
-        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS
-        to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected.
-        mid is the message ID for the subscribe request. The mid value can be
-        used to track the subscribe request by checking against the mid
-        argument in the on_subscribe() callback if it is defined.
-        
-        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
-        zero string length.
-        """
-        if qos<0 or qos>2:
-            raise ValueError('Invalid QoS level.')
-        if topic == None or len(topic) == 0:
-            raise ValueError('Invalid topic.')
-        topic = _fix_sub_topic(topic)
-
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        return self._send_subscribe(False, topic, qos)
-
-    def unsubscribe(self, topic):
-        """Unsubscribe the client from a topic.
-
-        sub: The subscription topic to unsubscribe from.
-
-        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS
-        to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected.
-        mid is the message ID for the unsubscribe request. The mid value can be
-        used to track the unsubscribe request by checking against the mid
-        argument in the on_unsubscribe() callback if it is defined.
-
-        Raises a ValueError if topic is None or has zero string length.
-        """
-        if topic == None or len(topic) == 0:
-            raise ValueError('Invalid topic.')
-        topic = _fix_sub_topic(topic)
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        return self._send_unsubscribe(False, topic)
-
-    def loop_read(self, max_packets=1):
-        """Process read network events. Use in place of calling loop() if you
-        wish to handle your client reads as part of your own application.
-
-        Use socket() to obtain the client socket to call select() or equivalent
-        on.
-        
-        Do not use if you are using the threaded interface loop_start()."""
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        max_packets = len(self._messages)
-        if max_packets < 1:
-            max_packets = 1
-
-        for i in range(0, max_packets):
-            rc = self._packet_read()
-            if rc > 0:
-                return self._loop_rc_handle(rc)
-            elif rc == MOSQ_ERR_AGAIN:
-                return MOSQ_ERR_SUCCESS
-        return MOSQ_ERR_SUCCESS
-
-    def loop_write(self, max_packets=1):
-        """Process read network events. Use in place of calling loop() if you
-        wish to handle your client reads as part of your own application.
-        
-        Use socket() to obtain the client socket to call select() or equivalent
-        on.
-
-        Use want_write() to determine if there is data waiting to be written.
-
-        Do not use if you are using the threaded interface loop_start()."""
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        max_packets = len(self._messages)
-        if max_packets < 1:
-            max_packets = 1
-
-        for i in range(0, max_packets):
-            rc = self._packet_write()
-            if rc > 0:
-                return self._loop_rc_handle(rc)
-            elif rc == MOSQ_ERR_AGAIN:
-                return MOSQ_ERR_SUCCESS
-        return MOSQ_ERR_SUCCESS
-
-    def want_write(self):
-        """Call to determine if there is network data waiting to be written.
-        Useful if you are calling select() yourself rather than using loop().
-        """
-        if self._current_out_packet or len(self._out_packet) > 0:
-            return True
-        else:
-            return False
-
-    def loop_misc(self):
-        """Process miscellaneous network events. Use in place of calling loop() if you
-        wish to call select() or equivalent on.
-
-        Do not use if you are using the threaded interface loop_start()."""
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        now = time.time()
-        self._check_keepalive()
-        if self._last_retry_check+1 < now:
-            # Only check once a second at most
-            self._message_retry_check()
-            self._last_retry_check = now
-
-        if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
-            # mosq->ping_t != 0 means we are waiting for a pingresp.
-            # This hasn't happened in the keepalive time so we should disconnect.
-            if self._ssl:
-                self._ssl.close()
-                self._ssl = None
-            elif self._sock:
-                self._sock.close()
-                self._sock = None
-
-            self._callback_mutex.acquire()
-            if self._state == mosq_cs_disconnecting:
-                rc = MOSQ_ERR_SUCCESS
-            else:
-                rc = 1
-            if self.on_disconnect:
-                self._in_callback = True
-                self.on_disconnect(self, self._userdata, rc)
-                self._in_callback = False
-            self._callback_mutex.release()
-            return MOSQ_ERR_CONN_LOST
-
-        return MOSQ_ERR_SUCCESS
-
-    def message_retry_set(self, retry):
-        """Set the timeout in seconds before a message with QoS>0 is retried.
-        20 seconds by default."""
-        if retry < 0:
-            raise ValueError('Invalid retry.')
-
-        self._message_retry = retry
-
-    def user_data_set(self, userdata):
-        """Set the user data variable passed to callbacks. May be any data type."""
-        self._userdata = userdata
-
-    def will_set(self, topic, payload=None, qos=0, retain=False):
-        """Set a Will to be sent by the broker in case the client disconnects unexpectedly.
-
-        This must be called before connect() to have any effect.
-
-        topic: The topic that the will message should be published on.
-        payload: The message to send as a will. If not given, or set to None a
-        zero length message will be used as the will. Passing an int or float
-        will result in the payload being converted to a string representing
-        that number. If you wish to send a true int/float, use struct.pack() to
-        create the payload you require.
-        qos: The quality of service level to use for the will.
-        retain: If set to true, the will message will be set as the "last known
-        good"/retained message for the topic.
-
-        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
-        zero string length.
-        """
-        if topic == None or len(topic) == 0:
-            raise ValueError('Invalid topic.')
-        if qos<0 or qos>2:
-            raise ValueError('Invalid QoS level.')
-        if isinstance(payload, str) or isinstance(payload, bytearray):
-            self._will_payload = payload
-        elif isinstance(payload, int) or isinstance(payload, float):
-            self._will_payload = str(payload)
-        elif payload == None:
-            self._will_payload = None
-        else:
-            raise TypeError('payload must be a string, bytearray, int, float or None.')
-
-        self._will = True
-        self._will_topic = topic
-        self._will_qos = qos
-        self._will_retain = retain
-
-    def will_clear(self):
-        """ Removes a will that was previously configured with will_set().
-        
-        Must be called before connect() to have any effect."""
-        self._will = False
-        self._will_topic = ""
-        self._will_payload = None
-        self._will_qos = 0
-        self._will_retain = False
-
-    def socket(self):
-        """Return the socket or ssl object for this client."""
-        if self._ssl:
-            return self._ssl
-        else:
-            return self._sock
-
-    def loop_forever(self, timeout=1.0, max_packets=1):
-        """This function call loop() for you in an infinite blocking loop. It
-        is useful for the case where you only want to run the MQTT client loop
-        in your program.
-
-        loop_forever() will handle reconnecting for you. If you call
-        disconnect() in a callback it will return."""
-
-        run = True
-        if self._state == mosq_cs_connect_async:
-            self.reconnect()
-
-        while run:
-            rc = MOSQ_ERR_SUCCESS
-            while rc == MOSQ_ERR_SUCCESS:
-                rc = self.loop(timeout, max_packets)
-
-            if self._state == mosq_cs_disconnecting:
-                run = False
-            else:
-                time.sleep(1)
-                self.reconnect()
-        return rc
-
-    def loop_start(self):
-        """This is part of the threaded client interface. Call this once to
-        start a new thread to process network traffic. This provides an
-        alternative to repeatedly calling loop() yourself.
-        """
-        if self._thread != None:
-            return MOSQ_ERR_INVAL
-
-        self._thread = threading.Thread(target=self._thread_main)
-        self._thread.daemon = True
-        self._thread.start()
-
-    def loop_stop(self, force=False):
-        """This is part of the threaded client interface. Call this once to
-        stop the network thread previously created with loop_start(). This call
-        will block until the network thread finishes.
-
-        The force parameter is currently ignored.
-        """
-        if self._thread == None:
-            return MOSQ_ERR_INVAL
-
-        self._thread_terminate = True
-        self._thread.join()
-        self._thread = None
-
-    # ============================================================
-    # Private functions
-    # ============================================================
-
-    def _loop_rc_handle(self, rc):
-        if rc:
-            if self._ssl:
-                self._ssl.close()
-                self._ssl = None
-            elif self._sock:
-                self._sock.close()
-                self._sock = None
-
-            self._state_mutex.acquire()
-            if self._state == mosq_cs_disconnecting:
-                rc = MOSQ_ERR_SUCCESS
-            self._state_mutex.release()
-            self._callback_mutex.acquire()
-            if self.on_disconnect:
-                self._in_callback = True
-                self.on_disconnect(self, self._userdata, rc)
-                self._in_callback = False
-
-            self._callback_mutex.release()
-        return rc
-
-    def _packet_read(self):
-        # This gets called if pselect() indicates that there is network data
-        # available - ie. at least one byte.  What we do depends on what data we
-        # already have.
-        # If we've not got a command, attempt to read one and save it. This should
-        # always work because it's only a single byte.
-        # Then try to read the remaining length. This may fail because it is may
-        # be more than one byte - will need to save data pending next read if it
-        # does fail.
-        # Then try to read the remaining payload, where 'payload' here means the
-        # combined variable header and actual payload. This is the most likely to
-        # fail due to longer length, so save current data and current position.
-        # After all data is read, send to _mosquitto_handle_packet() to deal with.
-        # Finally, free the memory and reset everything to starting conditions.
-        if self._in_packet.command == 0:
-            try:
-                if self._ssl:
-                    command = self._ssl.read(1)
-                else:
-                    command = self._sock.recv(1)
-            except socket.error as err:
-                (msg) = err
-                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
-                    return MOSQ_ERR_AGAIN
-                if msg.errno == errno.EAGAIN:
-                    return MOSQ_ERR_AGAIN
-                print(msg)
-                return 1
-            else:
-                if len(command) == 0:
-                    return 1
-                command = struct.unpack("!B", command)
-                self._in_packet.command = command[0]
-
-        if self._in_packet.have_remaining == 0:
-            # Read remaining
-            # Algorithm for decoding taken from pseudo code at
-            # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
-            while True:
-                try:
-                    if self._ssl:
-                        byte = self._ssl.read(1)
-                    else:
-                        byte = self._sock.recv(1)
-                except socket.error as err:
-                    (msg) = err
-                    if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
-                        return MOSQ_ERR_AGAIN
-                    if msg.errno == errno.EAGAIN:
-                        return MOSQ_ERR_AGAIN
-                    print(msg)
-                    return 1
-                else:
-                    byte = struct.unpack("!B", byte)
-                    byte = byte[0]
-                    self._in_packet.remaining_count.append(byte)
-                    # Max 4 bytes length for remaining length as defined by protocol.
-                     # Anything more likely means a broken/malicious client.
-                    if len(self._in_packet.remaining_count) > 4:
-                        return MOSQ_ERR_PROTOCOL
-
-                    self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult
-                    self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128
-
-                if (byte & 128) == 0:
-                    break
-
-            self._in_packet.have_remaining = 1
-            self._in_packet.to_process = self._in_packet.remaining_length
-
-        while self._in_packet.to_process > 0:
-            try:
-                if self._ssl:
-                    data = self._ssl.read(self._in_packet.to_process)
-                else:
-                    data = self._sock.recv(self._in_packet.to_process)
-            except socket.error as err:
-                (msg) = err
-                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
-                    return MOSQ_ERR_AGAIN
-                if msg.errno == errno.EAGAIN:
-                    return MOSQ_ERR_AGAIN
-                print(msg)
-                return 1
-            else:
-                self._in_packet.to_process = self._in_packet.to_process - len(data)
-                self._in_packet.packet = self._in_packet.packet + data
-
-        # All data for this packet is read.
-        self._in_packet.pos = 0
-        rc = self._packet_handle()
-
-        # Free data and reset values 
-        self._in_packet.cleanup()
-
-        self._msgtime_mutex.acquire()
-        self._last_msg_in = time.time()
-        self._msgtime_mutex.release()
-        return rc
-
-    def _packet_write(self):
-        self._current_out_packet_mutex.acquire()
-
-        while self._current_out_packet:
-            packet = self._current_out_packet
-
-            try:
-                if self._ssl:
-                    write_length = self._ssl.write(packet.packet[packet.pos:])
-                else:
-                    write_length = self._sock.send(packet.packet[packet.pos:])
-            except AttributeError:
-                self._current_out_packet_mutex.release()
-                return MOSQ_ERR_SUCCESS
-            except socket.error as err:
-                self._current_out_packet_mutex.release()
-                (msg) = err
-                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
-                    return MOSQ_ERR_AGAIN
-                if msg.errno == errno.EAGAIN:
-                    return MOSQ_ERR_AGAIN
-                print(msg)
-                return 1
-
-            if write_length > 0:
-                packet.to_process = packet.to_process - write_length
-                packet.pos = packet.pos + write_length
-
-                if packet.to_process == 0:
-                    if (packet.command & 0xF0) == PUBLISH and packet.qos == 0:
-                        self._callback_mutex.acquire()
-                        if self.on_publish:
-                            self._in_callback = True
-                            self.on_publish(self, self._userdata, packet.mid)
-                            self._in_callback = False
-
-                        self._callback_mutex.release()
-
-                    self._out_packet_mutex.acquire()
-                    if len(self._out_packet) > 0:
-                        self._current_out_packet = self._out_packet.pop(0)
-                    else:
-                        self._current_out_packet = None
-                    self._out_packet_mutex.release()
-            else:
-                pass # FIXME
-        
-        self._current_out_packet_mutex.release()
-
-        self._msgtime_mutex.acquire()
-        self._last_msg_out = time.time()
-        self._msgtime_mutex.release()
-
-        return MOSQ_ERR_SUCCESS
-
-    def _easy_log(self, level, buf):
-        if self.on_log:
-            self.on_log(self, self._userdata, level, buf)
-
-    def _check_keepalive(self):
-        now = time.time()
-        self._msgtime_mutex.acquire()
-        last_msg_out = self._last_msg_out
-        last_msg_in = self._last_msg_in
-        self._msgtime_mutex.release()
-        if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
-            if self._state == mosq_cs_connected and self._ping_t == 0:
-                self._send_pingreq()
-                self._msgtime_mutex.acquire()
-                self._last_msg_out = now
-                self._last_msg_in = now
-                self._msgtime_mutex.release()
-            else:
-                if self._ssl:
-                    self._ssl.close()
-                    self._ssl = None
-                elif self._sock:
-                    self._sock.close()
-                    self._sock = None
-
-                if self._state == mosq_cs_disconnecting:
-                    rc = MOSQ_ERR_SUCCESS
-                else:
-                    rc = 1
-                self._callback_mutex.acquire()
-                if self.on_disconnect:
-                    self._in_callback = True
-                    self.on_disconnect(self, self._userdata, rc)
-                    self._in_callback = False
-                self._callback_mutex.release()
-
-    def _mid_generate(self):
-        self._last_mid = self._last_mid + 1
-        if self._last_mid == 65536:
-            self._last_mid = 1
-        return self._last_mid
-
-    def _topic_wildcard_len_check(self, topic):
-        # Search for + or # in a topic. Return MOSQ_ERR_INVAL if found.
-         # Also returns MOSQ_ERR_INVAL if the topic string is too long.
-         # Returns MOSQ_ERR_SUCCESS if everything is fine.
-        if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535:
-            return MOSQ_ERR_INVAL
-        else:
-            return MOSQ_ERR_SUCCESS
-
-    def _send_pingreq(self):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGREQ")
-        rc = self._send_simple_command(PINGREQ)
-        if rc == MOSQ_ERR_SUCCESS:
-            self._ping_t = time.time()
-        return rc
-
-    def _send_pingresp(self):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGRESP")
-        return self._send_simple_command(PINGRESP)
-
-    def _send_puback(self, mid):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")")
-        return self._send_command_with_mid(PUBACK, mid, False)
-
-    def _send_pubcomp(self, mid):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")")
-        return self._send_command_with_mid(PUBCOMP, mid, False)
-
-    def _pack_remaining_length(self, packet, remaining_length):
-        remaining_bytes = []
-        while True:
-            byte = remaining_length % 128
-            remaining_length = remaining_length // 128
-            # If there are more digits to encode, set the top bit of this digit
-            if remaining_length > 0:
-                byte = byte | 0x80
-
-            remaining_bytes.append(byte)
-            packet.extend(struct.pack("!B", byte))
-            if remaining_length == 0:
-                # FIXME - this doesn't deal with incorrectly large payloads
-                return packet
-
-    def _pack_str16(self, packet, data):
-        if sys.version_info[0] < 3:
-            if isinstance(data, bytearray):
-                packet.extend(struct.pack("!H", len(data)))
-                packet.extend(data)
-            elif isinstance(data, str):
-                pack_format = "!H" + str(len(data)) + "s"
-                packet.extend(struct.pack(pack_format, len(data), data))
-            elif isinstance(data, unicode):
-                udata = data.encode('utf-8')
-                pack_format = "!H" + str(len(udata)) + "s"
-                packet.extend(struct.pack(pack_format, len(udata), udata))
-            else:
-                raise TypeError
-        else:
-            if isinstance(data, bytearray):
-                packet.extend(struct.pack("!H", len(data)))
-                packet.extend(data)
-            elif isinstance(data, str):
-                udata = data.encode('utf-8')
-                pack_format = "!H" + str(len(udata)) + "s"
-                packet.extend(struct.pack(pack_format, len(udata), udata))
-            else:
-                raise TypeError
-
-    def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
-        if self._sock == None and self._ssl == None:
-            return MOSQ_ERR_NO_CONN
-
-        command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
-        packet = bytearray()
-        packet.extend(struct.pack("!B", command))
-        if payload == None:
-            remaining_length = 2+len(topic)
-            self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)")
-        else:
-            remaining_length = 2+len(topic) + len(payload)
-            self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)")
-
-        if qos > 0:
-            # For message id
-            remaining_length = remaining_length + 2
-
-        self._pack_remaining_length(packet, remaining_length)
-        self._pack_str16(packet, topic)
-
-        if qos > 0:
-            # For message id
-            packet.extend(struct.pack("!H", mid))
-
-        if payload != None:
-            if isinstance(payload, str):
-                if sys.version_info[0] < 3:
-                    pack_format = str(len(payload)) + "s"
-                    packet.extend(struct.pack(pack_format, payload))
-                else:
-                    upayload = payload.encode('utf-8')
-                    pack_format = str(len(upayload)) + "s"
-                    packet.extend(struct.pack(pack_format, upayload))
-            elif isinstance(payload, bytearray):
-                packet.extend(payload)
-            elif isinstance(payload, unicode):
-                    upayload = payload.encode('utf-8')
-                    pack_format = str(len(upayload)) + "s"
-                    packet.extend(struct.pack(pack_format, upayload))
-            else:
-                raise TypeError('payload must be a string, unicode or a bytearray.')
-
-        return self._packet_queue(PUBLISH, packet, mid, qos)
-
-    def _send_pubrec(self, mid):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")")
-        return self._send_command_with_mid(PUBREC, mid, False)
-
-    def _send_pubrel(self, mid, dup=False):
-        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")")
-        return self._send_command_with_mid(PUBREL|2, mid, dup)
-
-    def _send_command_with_mid(self, command, mid, dup):
-        # For PUBACK, PUBCOMP, PUBREC, and PUBREL
-        if dup:
-            command = command | 8
-
-        remaining_length = 2
-        packet = struct.pack('!BBH', command, remaining_length, mid)
-        return self._packet_queue(command, packet, mid, 1)
-
-    def _send_simple_command(self, command):
-        # For DISCONNECT, PINGREQ and PINGRESP
-        remaining_length = 0
-        packet = struct.pack('!BB', command, remaining_length)
-        return self._packet_queue(command, packet, 0, 0)
-
-    def _send_connect(self, keepalive, clean_session):
-        remaining_length = 12 + 2+len(self._client_id)
-        connect_flags = 0
-        if clean_session:
-            connect_flags = connect_flags | 0x02
-
-        if self._will:
-            remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload)
-            connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5)
-
-        if self._username:
-            remaining_length = remaining_length + 2+len(self._username)
-            connect_flags = connect_flags | 0x80
-            if self._password:
-                connect_flags = connect_flags | 0x40
-                remaining_length = remaining_length + 2+len(self._password)
-
-        command = CONNECT
-        packet = bytearray()
-        packet.extend(struct.pack("!B", command))
-        self._pack_remaining_length(packet, remaining_length)
-        packet.extend(struct.pack("!H6sBBH", len(PROTOCOL_NAME), PROTOCOL_NAME, PROTOCOL_VERSION, connect_flags, keepalive))
-
-        self._pack_str16(packet, self._client_id)
-
-        if self._will:
-            self._pack_str16(packet, self._will_topic)
-            if len(self._will_payload) > 0:
-                self._pack_str16(packet, self._will_payload)
-            else:
-                packet.extend(struct.pack("!H", 0))
-
-        if self._username:
-            self._pack_str16(packet, self._username)
-
-            if self._password:
-                self._pack_str16(packet, self._password)
-
-        self._keepalive = keepalive
-        return self._packet_queue(command, packet, 0, 0)
-
-    def _send_disconnect(self):
-        return self._send_simple_command(DISCONNECT)
-
-    def _send_subscribe(self, dup, topic, topic_qos):
-        remaining_length = 2 + 2+len(topic) + 1
-        command = SUBSCRIBE | (dup<<3) | (1<<1)
-        packet = bytearray()
-        packet.extend(struct.pack("!B", command))
-        self._pack_remaining_length(packet, remaining_length)
-        local_mid = self._mid_generate()
-        packet.extend(struct.pack("!H", local_mid))
-        self._pack_str16(packet, topic)
-        packet.extend(struct.pack("B", topic_qos))
-        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
-
-    def _send_unsubscribe(self, dup, topic):
-        remaining_length = 2 + 2+len(topic)
-        command = UNSUBSCRIBE | (dup<<3) | (1<<1)
-        packet = bytearray()
-        packet.extend(struct.pack("!B", command))
-        self._pack_remaining_length(packet, remaining_length)
-        local_mid = self._mid_generate()
-        packet.extend(struct.pack("!H", local_mid))
-        self._pack_str16(packet, topic)
-        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
-
-    def _message_update(self, mid, direction, state):
-        for m in self._messages:
-            if m.mid == mid and m.direction == direction:
-                m.state = state
-                m.timestamp = time.time()
-                return MOSQ_ERR_SUCCESS
-
-        return MOSQ_ERR_NOT_FOUND
-
-    def _message_retry_check(self):
-        now = time.time()
-        for m in self._messages:
-            if m.timestamp + self._message_retry < now:
-                if m.state == mosq_ms_wait_puback or m.state == mosq_ms_wait_pubrec:
-                    m.timestamp = now
-                    m.dup = True
-                    self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
-                elif m.state == mosq_ms_wait_pubrel:
-                    m.timestamp = now
-                    m.dup = True
-                    self._send_pubrec(m.mid)
-                elif m.state == mosq_ms_wait_pubcomp:
-                    m.timestamp = now
-                    m.dup = True
-                    self._send_pubrel(m.mid, True)
-
-    def _messages_reconnect_reset(self):
-        for m in self._messages:
-            m.timestamp = 0
-            if m.direction == mosq_md_out:
-                if m.qos == 1:
-                    m.state = mosq_ms_wait_puback
-                elif m.qos == 2:
-                    m.state = mosq_ms_wait_pubrec
-            else:
-                self._messages.pop(self._messages.index(m))
-
-    def _packet_queue(self, command, packet, mid, qos):
-        mpkt = MosquittoPacket(command, packet, mid, qos)
-
-        self._out_packet_mutex.acquire()
-        self._out_packet.append(mpkt)
-        if self._current_out_packet_mutex.acquire(False):
-            if self._current_out_packet == None and len(self._out_packet) > 0:
-                self._current_out_packet = self._out_packet.pop(0)
-            self._current_out_packet_mutex.release()
-        self._out_packet_mutex.release()
-
-        if not self._in_callback and self._thread == None:
-            return self.loop_write()
-        else:
-            return MOSQ_ERR_SUCCESS
-
-    def _packet_handle(self):
-        cmd = self._in_packet.command&0xF0
-        if cmd == PINGREQ:
-            return self._handle_pingreq()
-        elif cmd == PINGRESP:
-            return self._handle_pingresp()
-        elif cmd == PUBACK:
-            return self._handle_pubackcomp("PUBACK")
-        elif cmd == PUBCOMP:
-            return self._handle_pubackcomp("PUBCOMP")
-        elif cmd == PUBLISH:
-            return self._handle_publish()
-        elif cmd == PUBREC:
-            return self._handle_pubrec()
-        elif cmd == PUBREL:
-            return self._handle_pubrel()
-        elif cmd == CONNACK:
-            return self._handle_connack()
-        elif cmd == SUBACK:
-            return self._handle_suback()
-        elif cmd == UNSUBACK:
-            return self._handle_unsuback()
-        else:
-            # If we don't recognise the command, return an error straight away.
-            self._easy_log(MOSQ_LOG_ERR, "Error: Unrecognised command "+str(cmd))
-            return MOSQ_ERR_PROTOCOL
-
-    def _handle_pingreq(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 0:
-                return MOSQ_ERR_PROTOCOL
-        
-        self._easy_log(MOSQ_LOG_DEBUG, "Received PINGREQ")
-        return self._send_pingresp()
-
-    def _handle_pingresp(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 0:
-                return MOSQ_ERR_PROTOCOL
-        
-        # No longer waiting for a PINGRESP.
-        self._ping_t = 0
-        self._easy_log(MOSQ_LOG_DEBUG, "Received PINGRESP")
-        return MOSQ_ERR_SUCCESS
-
-    def _handle_connack(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 2:
-                return MOSQ_ERR_PROTOCOL
-
-        if len(self._in_packet.packet) != 2:
-            return MOSQ_ERR_PROTOCOL
-
-        (resvd, result) = struct.unpack("!BB", self._in_packet.packet)
-        self._easy_log(MOSQ_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")")
-        self._callback_mutex.acquire()
-        if self.on_connect:
-            self._in_callback = True
-            self.on_connect(self, self._userdata, result)
-            self._in_callback = False
-        self._callback_mutex.release()
-        if result == 0:
-            self._state = mosq_cs_connected
-            return MOSQ_ERR_SUCCESS
-        elif result > 0 and result < 6:
-            return MOSQ_ERR_CONN_REFUSED
-        else:
-            return MOSQ_ERR_PROTOCOL
-
-    def _handle_suback(self):
-        self._easy_log(MOSQ_LOG_DEBUG, "Received SUBACK")
-        pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
-        (mid, packet) = struct.unpack(pack_format, self._in_packet.packet)
-        pack_format = "!" + "B"*len(packet)
-        granted_qos = struct.unpack(pack_format, packet)
-
-        self._callback_mutex.acquire()
-        if self.on_subscribe:
-            self._in_callback = True
-            self.on_subscribe(self, self._userdata, mid, granted_qos)
-            self._in_callback = False
-        self._callback_mutex.release()
-
-        return MOSQ_ERR_SUCCESS
-
-    def _handle_publish(self):
-        rc = 0
-
-        header = self._in_packet.command
-        message = MosquittoMessage()
-        message.direction = mosq_md_in
-        message.dup = (header & 0x08)>>3
-        message.qos = (header & 0x06)>>1
-        message.retain = (header & 0x01)
-
-        pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
-        (slen, packet) = struct.unpack(pack_format, self._in_packet.packet)
-        pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
-        (message.topic, packet) = struct.unpack(pack_format, packet)
-
-        if len(message.topic) == 0:
-            return MOSQ_ERR_PROTOCOL
-
-        if sys.version_info[0] >= 3:
-            message.topic = message.topic.decode('utf-8')
-        message.topic = _fix_sub_topic(message.topic)
-
-        if message.qos > 0:
-            pack_format = "!H" + str(len(packet)-2) + 's'
-            (message.mid, packet) = struct.unpack(pack_format, packet)
-
-        message.payload = packet
-
-        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+
-                ", q"+str(message.qos)+", r"+str(message.retain)+
-                ", m"+str(message.mid)+", '"+message.topic+
-                "', ...  ("+str(len(message.payload))+" bytes)")
-
-        message.timestamp = time.time()
-        if message.qos == 0:
-            self._callback_mutex.acquire()
-            if self.on_message:
-                self._in_callback = True
-                self.on_message(self, self._userdata, message)
-                self._in_callback = False
-
-            self._callback_mutex.release()
-            return MOSQ_ERR_SUCCESS
-        elif message.qos == 1:
-            rc = self._send_puback(message.mid)
-            self._callback_mutex.acquire()
-            if self.on_message:
-                self._in_callback = True
-                self.on_message(self, self._userdata, message)
-                self._in_callback = False
-
-            self._callback_mutex.release()
-            return rc
-        elif message.qos == 2:
-            rc = self._send_pubrec(message.mid)
-            message.state = mosq_ms_wait_pubrel
-            self._messages.append(message)
-            return rc
-        else:
-            return MOSQ_ERR_PROTOCOL
-
-    def _handle_pubrel(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 2:
-                return MOSQ_ERR_PROTOCOL
-        
-        if len(self._in_packet.packet) != 2:
-            return MOSQ_ERR_PROTOCOL
-
-        mid = struct.unpack("!H", self._in_packet.packet)
-        mid = mid[0]
-        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
-        
-        for i in range(len(self._messages)):
-            if self._messages[i].direction == mosq_md_in and self._messages[i].mid == mid:
-
-                # Only pass the message on if we have removed it from the queue - this
-                # prevents multiple callbacks for the same message.
-                self._callback_mutex.acquire()
-                if self.on_message:
-                    self._in_callback = True
-                    self.on_message(self, self._userdata, self._messages[i])
-                    self._in_callback = False
-                self._callback_mutex.release()
-                self._messages.pop(i)
-
-                return self._send_pubcomp(mid)
-
-        return MOSQ_ERR_SUCCESS
-
-    def _handle_pubrec(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 2:
-                return MOSQ_ERR_PROTOCOL
-        
-        mid = struct.unpack("!H", self._in_packet.packet)
-        mid = mid[0]
-        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
-        
-        for i in range(len(self._messages)):
-            if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid:
-                self._messages[i].state = mosq_ms_wait_pubcomp
-                self._messages[i].timestamp = time.time()
-                return self._send_pubrel(mid, False)
-        
-        return MOSQ_ERR_SUCCESS
-
-    def _handle_unsuback(self):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 2:
-                return MOSQ_ERR_PROTOCOL
-        
-        mid = struct.unpack("!H", self._in_packet.packet)
-        mid = mid[0]
-        self._easy_log(MOSQ_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
-        self._callback_mutex.acquire()
-        if self.on_unsubscribe:
-            self._in_callback = True
-            self.on_unsubscribe(self, self._userdata, mid)
-            self._in_callback = False
-        self._callback_mutex.release()
-        return MOSQ_ERR_SUCCESS
-
-    def _handle_pubackcomp(self, cmd):
-        if self._strict_protocol:
-            if self._in_packet.remaining_length != 2:
-                return MOSQ_ERR_PROTOCOL
-        
-        mid = struct.unpack("!H", self._in_packet.packet)
-        mid = mid[0]
-        self._easy_log(MOSQ_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
-        
-        for i in range(len(self._messages)):
-            try:
-                if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid:
-                    # Only inform the client the message has been sent once.
-                    self._callback_mutex.acquire()
-                    if self.on_publish:
-                        self._in_callback = True
-                        self.on_publish(self, self._userdata, mid)
-                        self._in_callback = False
-
-                    self._callback_mutex.release()
-                    self._messages.pop(i)
-            except IndexError:
-                # Have removed item so i>count.
-                # Not really an error.
-                pass
-
-        return MOSQ_ERR_SUCCESS
-
-    def _thread_main(self):
-        run = True
-        self._thread_terminate = False
-        self._state_mutex.acquire()
-        if self._state == mosq_cs_connect_async:
-            self._state_mutex.release()
-            self.reconnect()
-        else:
-            self._state_mutex.release()
-
-        while run:
-            rc = MOSQ_ERR_SUCCESS
-            while rc == MOSQ_ERR_SUCCESS:
-                rc = self.loop()
-                if self._thread_terminate:
-                    rc = 1
-                    run = False
-
-            self._state_mutex.acquire()
-            if self._state == mosq_cs_disconnecting:
-                run = False
-                self._state_mutex.release()
-            else:
-                self._state_mutex.release()
-                time.sleep(1)
-                self.reconnect()
-
diff --git a/lib/python/setup.py b/lib/python/setup.py
deleted file mode 100644 (file)
index 3d52d53..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-from sys import version
-
-from distutils.core import setup
-setup(name='mosquitto',
-       version='1.1.1',
-       description='MQTT version 3.1 client class',
-       author='Roger Light',
-       author_email='roger@atchoo.org',
-       url='http://mosquitto.org/',
-       download_url='http://mosquitto.org/files/',
-       license='BSD License',
-       py_modules=['mosquitto'],
-
-    classifiers=[
-        'Development Status :: 4 - Beta',
-        'Intended Audience :: Developers',
-        'License :: OSI Approved :: BSD License',
-        'Operating System :: MacOS :: MacOS X',
-        'Operating System :: Microsoft :: Windows',
-        'Operating System :: POSIX',
-        'Programming Language :: Python',
-        'Programming Language :: Python :: 2.6',
-        'Programming Language :: Python :: 2.7',
-        'Programming Language :: Python :: 3',
-        'Topic :: Communications',
-        'Topic :: Internet',
-        ]
-       )
diff --git a/lib/python/sub.py b/lib/python/sub.py
deleted file mode 100755 (executable)
index 7af946d..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2010,2011 Roger Light <roger@atchoo.org>
-# All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-# 
-# 1. Redistributions of source code must retain the above copyright notice,
-#   this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-#   notice, this list of conditions and the following disclaimer in the
-#   documentation and/or other materials provided with the distribution.
-# 3. Neither the name of mosquitto nor the names of its
-#   contributors may be used to endorse or promote products derived from
-#   this software without specific prior written permission.
-# 
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-import mosquitto
-
-def on_connect(mosq, obj, rc):
-    print("rc: "+str(rc))
-
-def on_message(mosq, obj, msg):
-    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
-
-def on_publish(mosq, obj, mid):
-    print("mid: "+str(mid))
-
-def on_subscribe(mosq, obj, mid, granted_qos):
-    print("Subscribed: "+str(mid)+" "+str(granted_qos))
-
-def on_log(mosq, obj, level, string):
-    print(string)
-
-mqttc = mosquitto.Mosquitto()
-mqttc.on_message = on_message
-mqttc.on_connect = on_connect
-mqttc.on_publish = on_publish
-mqttc.on_subscribe = on_subscribe
-# Uncomment to enable debug messages
-#mqttc.on_log = on_log
-mqttc.connect("test.mosquitto.org", 1883, 60)
-mqttc.subscribe("$SYS/#", 0)
-
-
-rc = 0
-while rc == 0:
-    rc = mqttc.loop()
-
-print("rc: "+str(rc))
diff --git a/setup.py b/setup.py
new file mode 100644 (file)
index 0000000..3d52d53
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,28 @@
+from sys import version
+
+from distutils.core import setup
+setup(name='mosquitto',
+       version='1.1.1',
+       description='MQTT version 3.1 client class',
+       author='Roger Light',
+       author_email='roger@atchoo.org',
+       url='http://mosquitto.org/',
+       download_url='http://mosquitto.org/files/',
+       license='BSD License',
+       py_modules=['mosquitto'],
+
+    classifiers=[
+        'Development Status :: 4 - Beta',
+        'Intended Audience :: Developers',
+        'License :: OSI Approved :: BSD License',
+        'Operating System :: MacOS :: MacOS X',
+        'Operating System :: Microsoft :: Windows',
+        'Operating System :: POSIX',
+        'Programming Language :: Python',
+        'Programming Language :: Python :: 2.6',
+        'Programming Language :: Python :: 2.7',
+        'Programming Language :: Python :: 3',
+        'Topic :: Communications',
+        'Topic :: Internet',
+        ]
+       )
diff --git a/src/paho/mqtt/mosquitto.py b/src/paho/mqtt/mosquitto.py
new file mode 100755 (executable)
index 0000000..86f6ce9
--- /dev/null
@@ -0,0 +1,1781 @@
+# Copyright (c) 2012 Roger Light <roger@atchoo.org>
+# All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+# 
+# 1. Redistributions of source code must retain the above copyright notice,
+#    this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+# 3. Neither the name of mosquitto nor the names of its
+#    contributors may be used to endorse or promote products derived from
+#    this software without specific prior written permission.
+# 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+#
+# This product includes software developed by the OpenSSL Project for use in
+# the OpenSSL Toolkit. (http://www.openssl.org/)
+# This product includes cryptographic software written by Eric Young
+# (eay@cryptsoft.com)
+# This product includes software written by Tim Hudson (tjh@cryptsoft.com)
+
+"""
+This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
+protocol that is easy to implement and suitable for low powered devices.
+"""
+import errno
+import random
+import select
+import socket
+import ssl
+import struct
+import sys
+import threading
+import time
+
+if sys.version_info[0] < 3:
+    PROTOCOL_NAME = "MQIsdp"
+else:
+    PROTOCOL_NAME = b"MQIsdp"
+
+PROTOCOL_VERSION = 3
+
+# Message types 
+CONNECT = 0x10
+CONNACK = 0x20
+PUBLISH = 0x30
+PUBACK = 0x40
+PUBREC = 0x50
+PUBREL = 0x60
+PUBCOMP = 0x70
+SUBSCRIBE = 0x80
+SUBACK = 0x90
+UNSUBSCRIBE = 0xA0
+UNSUBACK = 0xB0
+PINGREQ = 0xC0
+PINGRESP = 0xD0
+DISCONNECT = 0xE0
+
+# Log levels
+MOSQ_LOG_INFO = 0x01
+MOSQ_LOG_NOTICE = 0x02
+MOSQ_LOG_WARNING = 0x04
+MOSQ_LOG_ERR = 0x08
+MOSQ_LOG_DEBUG = 0x10
+
+# CONNACK codes
+CONNACK_ACCEPTED = 0
+CONNACK_REFUSED_PROTOCOL_VERSION = 1
+CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
+CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
+CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
+CONNACK_REFUSED_NOT_AUTHORIZED = 5
+
+# Connection state
+mosq_cs_new = 0
+mosq_cs_connected = 1
+mosq_cs_disconnecting = 2
+mosq_cs_connect_async = 3
+
+# Message direction
+mosq_md_invalid = 0
+mosq_md_in = 1
+mosq_md_out = 2
+
+# Message state
+mosq_ms_invalid = 0,
+mosq_ms_wait_puback = 1
+mosq_ms_wait_pubrec = 2
+mosq_ms_wait_pubrel = 3
+mosq_ms_wait_pubcomp = 4
+
+# Error values
+MOSQ_ERR_AGAIN = -1
+MOSQ_ERR_SUCCESS = 0
+MOSQ_ERR_NOMEM = 1
+MOSQ_ERR_PROTOCOL = 2
+MOSQ_ERR_INVAL = 3
+MOSQ_ERR_NO_CONN = 4
+MOSQ_ERR_CONN_REFUSED = 5
+MOSQ_ERR_NOT_FOUND = 6
+MOSQ_ERR_CONN_LOST = 7
+MOSQ_ERR_TLS = 8
+MOSQ_ERR_PAYLOAD_SIZE = 9
+MOSQ_ERR_NOT_SUPPORTED = 10
+MOSQ_ERR_AUTH = 11
+MOSQ_ERR_ACL_DENIED = 12
+MOSQ_ERR_UNKNOWN = 13
+MOSQ_ERR_ERRNO = 14
+
+def _fix_sub_topic(subtopic):
+    # Convert ////some////over/slashed///topic/etc/etc//
+    # into some/over/slashed/topic/etc/etc
+    if subtopic[0] == '/':
+        return '/'+'/'.join(filter(None, subtopic.split('/')))
+    else:
+        return '/'.join(filter(None, subtopic.split('/')))
+
+def error_string(mosq_errno):
+    """Return the error string associated with a mosquitto error number."""
+    if mosq_errno == MOSQ_ERR_SUCCESS:
+        return "No error."
+    elif mosq_errno == MOSQ_ERR_NOMEM:
+        return "Out of memory."
+    elif mosq_errno == MOSQ_ERR_PROTOCOL:
+        return "A network protocol error occurred when communicating with the broker."
+    elif mosq_errno == MOSQ_ERR_INVAL:
+        return "Invalid function arguments provided."
+    elif mosq_errno == MOSQ_ERR_NO_CONN:
+        return "The client is not currently connected."
+    elif mosq_errno == MOSQ_ERR_CONN_REFUSED:
+        return "The connection was refused."
+    elif mosq_errno == MOSQ_ERR_NOT_FOUND:
+        return "Message not found (internal error)."
+    elif mosq_errno == MOSQ_ERR_CONN_LOST:
+        return "The connection was lost."
+    elif mosq_errno == MOSQ_ERR_TLS:
+        return "A TLS error occurred."
+    elif mosq_errno == MOSQ_ERR_PAYLOAD_SIZE:
+        return "Payload too large."
+    elif mosq_errno == MOSQ_ERR_NOT_SUPPORTED:
+        return "This feature is not supported."
+    elif mosq_errno == MOSQ_ERR_AUTH:
+        return "Authorisation failed."
+    elif mosq_errno == MOSQ_ERR_ACL_DENIED:
+        return "Access denied by ACL."
+    elif mosq_errno == MOSQ_ERR_UNKNOWN:
+        return "Unknown error."
+    elif mosq_errno == MOSQ_ERR_ERRNO:
+        return "Error defined by errno."
+    else:
+        return "Unknown error."
+
+def connack_string(connack_code):
+    """Return the string associated with a CONNACK result."""
+    if connack_code == 0:
+        return "Connection Accepted."
+    elif connack_code == 1:
+        return "Connection Refused: unacceptable protocol version."
+    elif connack_code == 2:
+        return "Connection Refused: identifier rejected."
+    elif connack_code == 3:
+        return "Connection Refused: broker unavailable."
+    elif connack_code == 4:
+        return "Connection Refused: bad user name or password."
+    elif connack_code == 5:
+        return "Connection Refused: not authorised."
+    else:
+        return "Connection Refused: unknown reason."
+
+def topic_matches_sub(sub, topic):
+    """Check whether a topic matches a subscription.
+    
+    For example:
+    
+    foo/bar would match the subscription foo/# or +/bar
+    non/matching would not match the subscription non/+/+
+    """
+    result = True
+    local_sub = _fix_sub_topic(sub)
+    local_topic = _fix_sub_topic(topic)
+    multilevel_wildcard = False
+
+    slen = len(local_sub)
+    tlen = len(local_topic)
+
+    spos = 0;
+    tpos = 0;
+
+    while spos < slen and tpos < tlen:
+        if local_sub[spos] == local_topic[tpos]:
+            spos += 1
+            tpos += 1
+        else:
+            if local_sub[spos] == '+':
+                spos += 1
+                while tpos < tlen and local_topic[tpos] != '/':
+                    tpos += 1
+                if tpos == tlen and spos == slen:
+                    result = True
+                    break
+
+            elif local_sub[spos] == '#':
+                multilevel_wildcard = True
+                if spos+1 != slen:
+                    result = False
+                    break
+                else:
+                    result = True
+                    break
+
+            else:
+                result = False
+                break
+
+        if tpos == tlen-1:
+            # Check for e.g. foo matching foo/#
+            if spos == slen-3 and local_sub[spos+1] == '/' and local_sub[spos+2] == '#':
+                result = True
+                multilevel_wildcard = True
+                break
+
+    if not multilevel_wildcard and (tpos < tlen or spos < slen):
+        result = False
+
+    return result
+
+
+class MosquittoMessage:
+    """ This is a class that describes an incoming message. It is passed to the
+    on_message callback as the message parameter.
+    
+    Members:
+
+    topic : String. topic that the message was published on.
+    payload : String/bytes the message payload.
+    qos : Integer. The message Quality of Service 0, 1 or 2.
+    retain : Boolean. If true, the message is a retained message and not fresh.
+    mid : Integer. The message id.
+    """
+    def __init__(self):
+        self.timestamp = 0
+        self.direction = mosq_md_invalid
+        self.state = mosq_ms_invalid
+        self.dup = False
+        self.mid = 0
+        self.topic = ""
+        self.payload = None
+        self.qos = 0
+        self.retain = False
+
+class MosquittoInPacket:
+    """Internal datatype."""
+    def __init__(self):
+        self.command = 0
+        self.have_remaining = 0
+        self.remaining_count = []
+        self.remaining_mult = 1
+        self.remaining_length = 0
+        self.packet = b""
+        self.to_process = 0
+        self.pos = 0
+
+    def cleanup(self):
+        self.__init__()
+
+class MosquittoPacket:
+    """Internal datatype."""
+    def __init__(self, command, packet, mid, qos):
+        self.command = command
+        self.mid = mid
+        self.qos = qos
+        self.pos = 0
+        self.to_process = len(packet)
+        self.packet = packet
+
+class Mosquitto:
+    """MQTT version 3.1 client class.
+    
+    This is the main class for use communicating with an MQTT broker.
+
+    General usage flow:
+
+    * Use connect()/connect_async() to connect to a broker
+    * Call loop() frequently to maintain network traffic flow with the broker
+    * Or use loop_start() to set a thread running to call loop() for you.
+    * Or use loop_forever() to handle calling loop() for you in a blocking
+    * function.
+    * Use subscribe() to subscribe to a topic and receive messages
+    * Use publish() to send messages
+    * Use disconnect() to disconnect from the broker
+
+    Data returned from the broker is made available with the use of callback
+    functions as described below.
+
+    Callbacks
+    =========
+
+    A number of callback functions are available to receive data back from the
+    broker. To use a callback, define a function and then assign it to the
+    client:
+    
+    def on_connect(mosq, userdata, rc):
+        print("Connection returned " + str(rc))
+
+    client.on_connect = on_connect
+
+    All of the callbacks as described below have a "mosq" and an "userdata"
+    argument. "mosq" is the Mosquitto instance that is calling the callback.
+    "userdata" is user data of any type and can be set when creating a new client
+    instance or with user_data_set(userdata).
+    
+    The callbacks:
+
+    on_connect(mosq, userdata, rc): called when the broker responds to our connection
+      request. The value of rc determines success or not:
+      0: Connection successful
+      1: Connection refused - incorrect protocol version
+      2: Connection refused - invalid client identifier
+      3: Connection refused - server unavailable
+      4: Connection refused - bad username or password
+      5: Connection refused - not authorised
+      6-255: Currently unused.
+
+    on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker.
+      The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS
+      (0), the callback was called in response to a disconnect() call. If any
+      other value the disconnection was unexpected, such as might be caused by
+      a network error.
+
+    on_message(mosq, userdata, message): called when a message has been received on a
+      topic that the client subscribes to. The message variable is a
+      MosquittoMessage that describes all of the message parameters.
+
+    on_publish(mosq, userdata, mid): called when a message that was to be sent using the
+      publish() call has completed transmission to the broker. For messages
+      with QoS levels 1 and 2, this means that the appropriate handshakes have
+      completed. For QoS 0, this simply means that the message has left the
+      client. The mid variable matches the mid variable returned from the
+      corresponding publish() call, to allow outgoing messages to be tracked.
+      This callback is important because even if the publish() call returns
+      success, it does not always mean that the message has been sent.
+
+    on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a
+      subscribe request. The mid variable matches the mid variable returned
+      from the corresponding subscribe() call. The granted_qos variable is a
+      list of integers that give the QoS level the broker has granted for each
+      of the different subscription requests.
+
+    on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe
+      request. The mid variable matches the mid variable returned from the
+      corresponding unsubscribe() call.
+
+    on_log(mosq, userdata, level, buf): called when the client has log information. Define
+      to allow debugging. The level variable gives the severity of the message
+      and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING,
+      MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf.
+
+    """
+    def __init__(self, client_id="", clean_session=True, userdata=None):
+        """client_id is the unique client id string used when connecting to the
+        broker. If client_id is zero length or None, then one will be randomly
+        generated. In this case, clean_session must be True. If this is not the
+        case a ValueError will be raised.
+
+        clean_session is a boolean that determines the client type. If True,
+        the broker will remove all information about this client when it
+        disconnects. If False, the client is a persistent client and
+        subscription information and queued messages will be retained when the
+        client disconnects. 
+        Note that a client will never discard its own outgoing messages on
+        disconnect. Calling connect() or reconnect() will cause the messages to
+        be resent.  Use reinitialise() to reset a client to its original state.
+
+        userdata is user defined data of any type that is passed as the "userdata"
+        parameter to callbacks. It may be updated at a later point with the
+        user_data_set() function.
+        """
+        if not clean_session and (client_id == "" or client_id == None):
+            raise ValueError('A client id must be provided if clean session is False.')
+
+        self._userdata = userdata
+        self._sock = None
+        self._keepalive = 60
+        self._message_retry = 20
+        self._last_retry_check = 0
+        self._clean_session = clean_session
+        if client_id == "":
+            self._client_id = "mosq/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5))
+        else:
+            self._client_id = client_id
+
+        self._username = ""
+        self._password = ""
+        self._in_packet = MosquittoInPacket()
+        self._out_packet = []
+        self._current_out_packet = None
+        self._last_msg_in = time.time()
+        self._last_msg_out = time.time()
+        self._ping_t = 0
+        self._last_mid = 0
+        self._state = mosq_cs_new
+        self._messages = []
+        self._will = False
+        self._will_topic = ""
+        self._will_payload = None
+        self._will_qos = 0
+        self._will_retain = False
+        self.on_disconnect = None
+        self.on_connect = None
+        self.on_publish = None
+        self.on_message = None
+        self.on_subscribe = None
+        self.on_unsubscribe = None
+        self.on_log = None
+        self._host = ""
+        self._port = 1883
+        self._in_callback = False
+        self._strict_protocol = False
+        self._callback_mutex = threading.Lock()
+        self._state_mutex = threading.Lock()
+        self._out_packet_mutex = threading.Lock()
+        self._current_out_packet_mutex = threading.Lock()
+        self._msgtime_mutex = threading.Lock()
+        self._thread = None
+        self._thread_terminate = False
+        self._ssl = None
+        self._tls_certfile = None
+        self._tls_keyfile = None
+        self._tls_ca_certs = None
+        self._tls_cert_reqs = None
+        self._tls_ciphers = None
+
+    def __del__(self):
+        pass
+
+    def reinitialise(self, client_id="", clean_session=True, userdata=None):
+        if self._ssl:
+            self._ssl.close()
+            self._ssl = None
+            self._sock = None
+        elif self._sock:
+            self._sock.close()
+            self._sock = None
+        self.__init__(client_id, clean_session, userdata)
+
+    def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None):
+        """Configure network encryption and authentication options. Enables SSL/TLS support.
+
+        ca_certs : a string path to the Certificate Authority certificate files
+        that are to be treated as trusted by this client. If this is the only
+        option given then the client will operate in a similar manner to a web
+        browser. That is to say it will require the broker to have a
+        certificate signed by the Certificate Authorities in ca_certs and will
+        communicate using TLS v1, but will not attempt any form of
+        authentication. This provides basic network encryption but may not be
+        sufficient depending on how the broker is configured.
+
+        certfile and keyfile are strings pointing to the PEM encoded client
+        certificate and private keys respectively. If these arguments are not
+        None then they will be used as client information for TLS based
+        authentication.  Support for this feature is broker dependent. Note
+        that if either of these files in encrypted and needs a password to
+        decrypt it, Python will ask for the password at the command line. It is
+        not currently possible to define a callback to provide the password.
+        
+        cert_reqs allows the certificate requirements that the client imposes
+        on the broker to be changed. By default this is ssl.CERT_REQUIRED,
+        which means that the broker must provide a certificate. See the ssl
+        pydoc for more information on this parameter.
+        
+        tls_version allows the version of the SSL/TLS protocol used to be
+        specified. By default TLS v1 is used. Previous versions (all versions
+        beginning with SSL) are possible but not recommended due to possible
+        security problems.
+
+        ciphers is a string specifying which encryption ciphers are allowable
+        for this connection, or None to use the defaults. See the ssl pydoc for
+        more information.
+
+        Must be called before connect() or connect_async()."""
+        if sys.version < '2.7':
+            raise ValueError('Python 2.7 is the minimum supported version for TLS.')
+
+        if ca_certs == None:
+            raise ValueError('ca_certs must not be None.')
+
+        try:
+            f = open(ca_certs, "r")
+        except IOError as err:
+            raise IOError(ca_certs+": "+err.strerror)
+        else:
+            f.close()
+        if certfile != None:
+            try:
+                f = open(certfile, "r")
+            except IOError as err:
+                raise IOError(certfile+": "+err.strerror)
+            else:
+                f.close()
+        if keyfile != None:
+            try:
+                f = open(keyfile, "r")
+            except IOError as err:
+                raise IOError(keyfile+": "+err.strerror)
+            else:
+                f.close()
+
+        self._tls_ca_certs = ca_certs
+        self._tls_certfile = certfile
+        self._tls_keyfile = keyfile
+        self._tls_cert_reqs = cert_reqs
+        self._tls_version = tls_version
+        self._tls_ciphers = ciphers
+
+    def connect(self, host, port=1883, keepalive=60):
+        """Connect to a remote broker.
+
+        host is the hostname or IP address of the remote broker.
+        port is the network port of the server host to connect to. Defaults to
+        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
+        are using tls_set() the port may need providing.
+        keepalive: Maximum period in seconds between communications with the
+        broker. If no other messages are being exchanged, this controls the
+        rate at which the client will send ping messages to the broker.
+        """
+        self.connect_async(host, port, keepalive)
+        return self.reconnect()
+
+    def connect_async(self, host, port=1883, keepalive=60):
+        """Connect to a remote broker asynchronously. This is a non-blocking
+        connect call that can be used with loop_start() to provide very quick
+        start.
+
+        host is the hostname or IP address of the remote broker.
+        port is the network port of the server host to connect to. Defaults to
+        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
+        are using tls_set() the port may need providing.
+        keepalive: Maximum period in seconds between communications with the
+        broker. If no other messages are being exchanged, this controls the
+        rate at which the client will send ping messages to the broker.
+        """
+        if host == None or len(host) == 0:
+            raise ValueError('Invalid host.')
+        if port <= 0:
+            raise ValueError('Invalid port number.')
+        if keepalive < 0:
+            raise ValueError('Keepalive must be >=0.')
+
+        self._host = host
+        self._port = port
+        self._keepalive = keepalive
+
+        self._state_mutex.acquire()
+        self._state = mosq_cs_connect_async
+        self._state_mutex.release()
+
+    def reconnect(self):
+        """Reconnect the client after a disconnect. Can only be called after
+        connect()/connect_async()."""
+        if len(self._host) == 0:
+            raise ValueError('Invalid host.')
+        if self._port <= 0:
+            raise ValueError('Invalid port number.')
+
+        self._in_packet.cleanup()
+        self._out_packet_mutex.acquire()
+        self._out_packet = []
+        self._out_packet_mutex.release()
+
+        self._current_out_packet_mutex.acquire()
+        self._current_out_packet = None
+        self._current_out_packet_mutex.release()
+
+        self._msgtime_mutex.acquire()
+        self._last_msg_in = time.time()
+        self._last_msg_out = time.time()
+        self._msgtime_mutex.release()
+
+        self._ping_t = 0
+        self._state_mutex.acquire()
+        self._state = mosq_cs_new
+        self._state_mutex.release()
+        if self._ssl:
+            self._ssl.close()
+            self._ssl = None
+            self._sock = None
+        elif self._sock:
+            self._sock.close()
+            self._sock = None
+
+        # Put messages in progress in a valid state.
+        self._messages_reconnect_reset()
+
+        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        # FIXME use create_connection here
+
+
+        if self._tls_ca_certs != None:
+            self._ssl = ssl.wrap_socket(self._sock,
+                    certfile=self._tls_certfile,
+                    keyfile=self._tls_keyfile,
+                    ca_certs=self._tls_ca_certs,
+                    cert_reqs=self._tls_cert_reqs,
+                    ssl_version=self._tls_version,
+                    ciphers=self._tls_ciphers)
+
+        try:
+            self.socket().connect((self._host, self._port))
+        except socket.error as err:
+            (msg) = err
+            if msg.errno != errno.EINPROGRESS:
+                print(msg)
+                return 1
+
+        self._sock.setblocking(0)
+
+        return self._send_connect(self._keepalive, self._clean_session)
+
+    def loop(self, timeout=1.0, max_packets=1):
+        """Process network events.
+
+        This function must be called regularly to ensure communication with the
+        broker is carried out. It calls select() on the network socket to wait
+        for network events. If incoming data is present it will then be
+        processed. Outgoing commands, from e.g. publish(), are normally sent
+        immediately that their function is called, but this is not always
+        possible. loop() will also attempt to send any remaining outgoing
+        messages, which also includes commands that are part of the flow for
+        messages with QoS>0.
+
+        timeout: The time in seconds to wait for incoming/outgoing network
+          traffic before timing out and returning. 
+        max_packets: Not currently used.
+
+        Returns MOSQ_ERR_SUCCESS on success.
+        Returns >0 on error.
+
+        A ValueError will be raised if timeout < 0"""
+        if timeout < 0.0:
+            raise ValueError('Invalid timeout.')
+
+        self._current_out_packet_mutex.acquire()
+        self._out_packet_mutex.acquire()
+        if self._current_out_packet == None and len(self._out_packet) > 0:
+            self._current_out_packet = self._out_packet.pop(0)
+
+        if self._current_out_packet:
+            wlist = [self.socket()]
+        else:
+            wlist = []
+        self._out_packet_mutex.release()
+        self._current_out_packet_mutex.release()
+
+        rlist = [self.socket()]
+        try:
+            socklist = select.select(rlist, wlist, [], timeout)
+        except TypeError:
+            # Socket isn't correct type, in likelihood connection is lost
+            return MOSQ_ERR_CONN_LOST
+
+        if self.socket() in socklist[0]:
+            rc = self.loop_read(max_packets)
+            if rc or (self._ssl == None and self._sock == None):
+                return rc
+
+        if self.socket() in socklist[1]:
+            rc = self.loop_write(max_packets)
+            if rc or (self._ssl == None and self._sock == None):
+                return rc
+
+        return self.loop_misc()
+
+    def publish(self, topic, payload=None, qos=0, retain=False):
+        """Publish a message on a topic.
+
+        This causes a message to be sent to the broker and subsequently from
+        the broker to any clients subscribing to matching topics.
+
+        topic: The topic that the message should be published on.
+        payload: The actual message to send. If not given, or set to None a
+        zero length message will be used. Passing an int or float will result
+        in the payload being converted to a string representing that number. If
+        you wish to send a true int/float, use struct.pack() to create the
+        payload you require.
+        qos: The quality of service level to use.
+        retain: If set to true, the message will be set as the "last known
+        good"/retained message for the topic.
+
+        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS to
+        indicate success or MOSQ_ERR_NO_CONN if the client is not currently
+        connected.  mid is the message ID for the publish request. The mid
+        value can be used to track the publish request by checking against the
+        mid argument in the on_publish() callback if it is defined.
+
+        A ValueError will be raised if topic == None, has zero length or is
+        invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
+        the length of the payload is greater than 268435455 bytes."""
+        if topic == None or len(topic) == 0:
+            raise ValueError('Invalid topic.')
+        if qos<0 or qos>2:
+            raise ValueError('Invalid QoS level.')
+        if isinstance(payload, str) or isinstance(payload, bytearray):
+            local_payload = payload
+        elif isinstance(payload, int) or isinstance(payload, float):
+            local_payload = str(payload)
+        elif payload == None:
+            local_payload = None
+        else:
+            raise TypeError('payload must be a string, bytearray, int, float or None.')
+
+        if local_payload != None and len(local_payload) > 268435455:
+            raise ValueError('Payload too large.')
+
+        if self._topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS:
+            raise ValueError('Publish topic cannot contain wildcards.')
+
+        local_mid = self._mid_generate()
+
+        if qos == 0:
+            rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False)
+            return (rc, local_mid)
+        else:
+            message = MosquittoMessage()
+            message.timestamp = time.time()
+            message.direction = mosq_md_out
+            if qos == 1:
+                message.state = mosq_ms_wait_puback
+            elif qos == 2:
+                message.state = mosq_ms_wait_pubrec
+
+            message.mid = local_mid
+            message.topic = topic
+            if local_payload == None or len(local_payload) == 0:
+                message.payload = None
+            else:
+                message.payload = local_payload
+
+            message.qos = qos
+            message.retain = retain
+            message.dup = False
+
+            self._messages.append(message)
+            rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
+            return (rc, local_mid)
+
+    def username_pw_set(self, username, password=None):
+        """Set a username and optionally a password for broker authentication.
+
+        Must be called before connect() to have any effect.
+        Requires a broker that supports MQTT v3.1.
+
+        username: The username to authenticate with. Need have no relationship to the client id.
+        password: The password to authenticate with. Optional, set to None if not required.
+        """
+        self._username = username
+        self._password = password
+
+    def disconnect(self):
+        """Disconnect a connected client from the broker."""
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        self._state_mutex.acquire()
+        self._state = mosq_cs_disconnecting
+        self._state_mutex.release()
+
+        return self._send_disconnect()
+
+    def subscribe(self, topic, qos=0):
+        """Subscribe the client to a topic.
+
+        sub: The subscription topic to subscribe to.
+        qos: The desired quality of service level for the subscription.
+
+        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS
+        to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected.
+        mid is the message ID for the subscribe request. The mid value can be
+        used to track the subscribe request by checking against the mid
+        argument in the on_subscribe() callback if it is defined.
+        
+        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
+        zero string length.
+        """
+        if qos<0 or qos>2:
+            raise ValueError('Invalid QoS level.')
+        if topic == None or len(topic) == 0:
+            raise ValueError('Invalid topic.')
+        topic = _fix_sub_topic(topic)
+
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        return self._send_subscribe(False, topic, qos)
+
+    def unsubscribe(self, topic):
+        """Unsubscribe the client from a topic.
+
+        sub: The subscription topic to unsubscribe from.
+
+        Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS
+        to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected.
+        mid is the message ID for the unsubscribe request. The mid value can be
+        used to track the unsubscribe request by checking against the mid
+        argument in the on_unsubscribe() callback if it is defined.
+
+        Raises a ValueError if topic is None or has zero string length.
+        """
+        if topic == None or len(topic) == 0:
+            raise ValueError('Invalid topic.')
+        topic = _fix_sub_topic(topic)
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        return self._send_unsubscribe(False, topic)
+
+    def loop_read(self, max_packets=1):
+        """Process read network events. Use in place of calling loop() if you
+        wish to handle your client reads as part of your own application.
+
+        Use socket() to obtain the client socket to call select() or equivalent
+        on.
+        
+        Do not use if you are using the threaded interface loop_start()."""
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        max_packets = len(self._messages)
+        if max_packets < 1:
+            max_packets = 1
+
+        for i in range(0, max_packets):
+            rc = self._packet_read()
+            if rc > 0:
+                return self._loop_rc_handle(rc)
+            elif rc == MOSQ_ERR_AGAIN:
+                return MOSQ_ERR_SUCCESS
+        return MOSQ_ERR_SUCCESS
+
+    def loop_write(self, max_packets=1):
+        """Process read network events. Use in place of calling loop() if you
+        wish to handle your client reads as part of your own application.
+        
+        Use socket() to obtain the client socket to call select() or equivalent
+        on.
+
+        Use want_write() to determine if there is data waiting to be written.
+
+        Do not use if you are using the threaded interface loop_start()."""
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        max_packets = len(self._messages)
+        if max_packets < 1:
+            max_packets = 1
+
+        for i in range(0, max_packets):
+            rc = self._packet_write()
+            if rc > 0:
+                return self._loop_rc_handle(rc)
+            elif rc == MOSQ_ERR_AGAIN:
+                return MOSQ_ERR_SUCCESS
+        return MOSQ_ERR_SUCCESS
+
+    def want_write(self):
+        """Call to determine if there is network data waiting to be written.
+        Useful if you are calling select() yourself rather than using loop().
+        """
+        if self._current_out_packet or len(self._out_packet) > 0:
+            return True
+        else:
+            return False
+
+    def loop_misc(self):
+        """Process miscellaneous network events. Use in place of calling loop() if you
+        wish to call select() or equivalent on.
+
+        Do not use if you are using the threaded interface loop_start()."""
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        now = time.time()
+        self._check_keepalive()
+        if self._last_retry_check+1 < now:
+            # Only check once a second at most
+            self._message_retry_check()
+            self._last_retry_check = now
+
+        if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
+            # mosq->ping_t != 0 means we are waiting for a pingresp.
+            # This hasn't happened in the keepalive time so we should disconnect.
+            if self._ssl:
+                self._ssl.close()
+                self._ssl = None
+            elif self._sock:
+                self._sock.close()
+                self._sock = None
+
+            self._callback_mutex.acquire()
+            if self._state == mosq_cs_disconnecting:
+                rc = MOSQ_ERR_SUCCESS
+            else:
+                rc = 1
+            if self.on_disconnect:
+                self._in_callback = True
+                self.on_disconnect(self, self._userdata, rc)
+                self._in_callback = False
+            self._callback_mutex.release()
+            return MOSQ_ERR_CONN_LOST
+
+        return MOSQ_ERR_SUCCESS
+
+    def message_retry_set(self, retry):
+        """Set the timeout in seconds before a message with QoS>0 is retried.
+        20 seconds by default."""
+        if retry < 0:
+            raise ValueError('Invalid retry.')
+
+        self._message_retry = retry
+
+    def user_data_set(self, userdata):
+        """Set the user data variable passed to callbacks. May be any data type."""
+        self._userdata = userdata
+
+    def will_set(self, topic, payload=None, qos=0, retain=False):
+        """Set a Will to be sent by the broker in case the client disconnects unexpectedly.
+
+        This must be called before connect() to have any effect.
+
+        topic: The topic that the will message should be published on.
+        payload: The message to send as a will. If not given, or set to None a
+        zero length message will be used as the will. Passing an int or float
+        will result in the payload being converted to a string representing
+        that number. If you wish to send a true int/float, use struct.pack() to
+        create the payload you require.
+        qos: The quality of service level to use for the will.
+        retain: If set to true, the will message will be set as the "last known
+        good"/retained message for the topic.
+
+        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
+        zero string length.
+        """
+        if topic == None or len(topic) == 0:
+            raise ValueError('Invalid topic.')
+        if qos<0 or qos>2:
+            raise ValueError('Invalid QoS level.')
+        if isinstance(payload, str) or isinstance(payload, bytearray):
+            self._will_payload = payload
+        elif isinstance(payload, int) or isinstance(payload, float):
+            self._will_payload = str(payload)
+        elif payload == None:
+            self._will_payload = None
+        else:
+            raise TypeError('payload must be a string, bytearray, int, float or None.')
+
+        self._will = True
+        self._will_topic = topic
+        self._will_qos = qos
+        self._will_retain = retain
+
+    def will_clear(self):
+        """ Removes a will that was previously configured with will_set().
+        
+        Must be called before connect() to have any effect."""
+        self._will = False
+        self._will_topic = ""
+        self._will_payload = None
+        self._will_qos = 0
+        self._will_retain = False
+
+    def socket(self):
+        """Return the socket or ssl object for this client."""
+        if self._ssl:
+            return self._ssl
+        else:
+            return self._sock
+
+    def loop_forever(self, timeout=1.0, max_packets=1):
+        """This function call loop() for you in an infinite blocking loop. It
+        is useful for the case where you only want to run the MQTT client loop
+        in your program.
+
+        loop_forever() will handle reconnecting for you. If you call
+        disconnect() in a callback it will return."""
+
+        run = True
+        if self._state == mosq_cs_connect_async:
+            self.reconnect()
+
+        while run:
+            rc = MOSQ_ERR_SUCCESS
+            while rc == MOSQ_ERR_SUCCESS:
+                rc = self.loop(timeout, max_packets)
+
+            if self._state == mosq_cs_disconnecting:
+                run = False
+            else:
+                time.sleep(1)
+                self.reconnect()
+        return rc
+
+    def loop_start(self):
+        """This is part of the threaded client interface. Call this once to
+        start a new thread to process network traffic. This provides an
+        alternative to repeatedly calling loop() yourself.
+        """
+        if self._thread != None:
+            return MOSQ_ERR_INVAL
+
+        self._thread = threading.Thread(target=self._thread_main)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def loop_stop(self, force=False):
+        """This is part of the threaded client interface. Call this once to
+        stop the network thread previously created with loop_start(). This call
+        will block until the network thread finishes.
+
+        The force parameter is currently ignored.
+        """
+        if self._thread == None:
+            return MOSQ_ERR_INVAL
+
+        self._thread_terminate = True
+        self._thread.join()
+        self._thread = None
+
+    # ============================================================
+    # Private functions
+    # ============================================================
+
+    def _loop_rc_handle(self, rc):
+        if rc:
+            if self._ssl:
+                self._ssl.close()
+                self._ssl = None
+            elif self._sock:
+                self._sock.close()
+                self._sock = None
+
+            self._state_mutex.acquire()
+            if self._state == mosq_cs_disconnecting:
+                rc = MOSQ_ERR_SUCCESS
+            self._state_mutex.release()
+            self._callback_mutex.acquire()
+            if self.on_disconnect:
+                self._in_callback = True
+                self.on_disconnect(self, self._userdata, rc)
+                self._in_callback = False
+
+            self._callback_mutex.release()
+        return rc
+
+    def _packet_read(self):
+        # This gets called if pselect() indicates that there is network data
+        # available - ie. at least one byte.  What we do depends on what data we
+        # already have.
+        # If we've not got a command, attempt to read one and save it. This should
+        # always work because it's only a single byte.
+        # Then try to read the remaining length. This may fail because it is may
+        # be more than one byte - will need to save data pending next read if it
+        # does fail.
+        # Then try to read the remaining payload, where 'payload' here means the
+        # combined variable header and actual payload. This is the most likely to
+        # fail due to longer length, so save current data and current position.
+        # After all data is read, send to _mosquitto_handle_packet() to deal with.
+        # Finally, free the memory and reset everything to starting conditions.
+        if self._in_packet.command == 0:
+            try:
+                if self._ssl:
+                    command = self._ssl.read(1)
+                else:
+                    command = self._sock.recv(1)
+            except socket.error as err:
+                (msg) = err
+                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
+                    return MOSQ_ERR_AGAIN
+                if msg.errno == errno.EAGAIN:
+                    return MOSQ_ERR_AGAIN
+                print(msg)
+                return 1
+            else:
+                if len(command) == 0:
+                    return 1
+                command = struct.unpack("!B", command)
+                self._in_packet.command = command[0]
+
+        if self._in_packet.have_remaining == 0:
+            # Read remaining
+            # Algorithm for decoding taken from pseudo code at
+            # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
+            while True:
+                try:
+                    if self._ssl:
+                        byte = self._ssl.read(1)
+                    else:
+                        byte = self._sock.recv(1)
+                except socket.error as err:
+                    (msg) = err
+                    if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
+                        return MOSQ_ERR_AGAIN
+                    if msg.errno == errno.EAGAIN:
+                        return MOSQ_ERR_AGAIN
+                    print(msg)
+                    return 1
+                else:
+                    byte = struct.unpack("!B", byte)
+                    byte = byte[0]
+                    self._in_packet.remaining_count.append(byte)
+                    # Max 4 bytes length for remaining length as defined by protocol.
+                     # Anything more likely means a broken/malicious client.
+                    if len(self._in_packet.remaining_count) > 4:
+                        return MOSQ_ERR_PROTOCOL
+
+                    self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult
+                    self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128
+
+                if (byte & 128) == 0:
+                    break
+
+            self._in_packet.have_remaining = 1
+            self._in_packet.to_process = self._in_packet.remaining_length
+
+        while self._in_packet.to_process > 0:
+            try:
+                if self._ssl:
+                    data = self._ssl.read(self._in_packet.to_process)
+                else:
+                    data = self._sock.recv(self._in_packet.to_process)
+            except socket.error as err:
+                (msg) = err
+                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
+                    return MOSQ_ERR_AGAIN
+                if msg.errno == errno.EAGAIN:
+                    return MOSQ_ERR_AGAIN
+                print(msg)
+                return 1
+            else:
+                self._in_packet.to_process = self._in_packet.to_process - len(data)
+                self._in_packet.packet = self._in_packet.packet + data
+
+        # All data for this packet is read.
+        self._in_packet.pos = 0
+        rc = self._packet_handle()
+
+        # Free data and reset values 
+        self._in_packet.cleanup()
+
+        self._msgtime_mutex.acquire()
+        self._last_msg_in = time.time()
+        self._msgtime_mutex.release()
+        return rc
+
+    def _packet_write(self):
+        self._current_out_packet_mutex.acquire()
+
+        while self._current_out_packet:
+            packet = self._current_out_packet
+
+            try:
+                if self._ssl:
+                    write_length = self._ssl.write(packet.packet[packet.pos:])
+                else:
+                    write_length = self._sock.send(packet.packet[packet.pos:])
+            except AttributeError:
+                self._current_out_packet_mutex.release()
+                return MOSQ_ERR_SUCCESS
+            except socket.error as err:
+                self._current_out_packet_mutex.release()
+                (msg) = err
+                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
+                    return MOSQ_ERR_AGAIN
+                if msg.errno == errno.EAGAIN:
+                    return MOSQ_ERR_AGAIN
+                print(msg)
+                return 1
+
+            if write_length > 0:
+                packet.to_process = packet.to_process - write_length
+                packet.pos = packet.pos + write_length
+
+                if packet.to_process == 0:
+                    if (packet.command & 0xF0) == PUBLISH and packet.qos == 0:
+                        self._callback_mutex.acquire()
+                        if self.on_publish:
+                            self._in_callback = True
+                            self.on_publish(self, self._userdata, packet.mid)
+                            self._in_callback = False
+
+                        self._callback_mutex.release()
+
+                    self._out_packet_mutex.acquire()
+                    if len(self._out_packet) > 0:
+                        self._current_out_packet = self._out_packet.pop(0)
+                    else:
+                        self._current_out_packet = None
+                    self._out_packet_mutex.release()
+            else:
+                pass # FIXME
+        
+        self._current_out_packet_mutex.release()
+
+        self._msgtime_mutex.acquire()
+        self._last_msg_out = time.time()
+        self._msgtime_mutex.release()
+
+        return MOSQ_ERR_SUCCESS
+
+    def _easy_log(self, level, buf):
+        if self.on_log:
+            self.on_log(self, self._userdata, level, buf)
+
+    def _check_keepalive(self):
+        now = time.time()
+        self._msgtime_mutex.acquire()
+        last_msg_out = self._last_msg_out
+        last_msg_in = self._last_msg_in
+        self._msgtime_mutex.release()
+        if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
+            if self._state == mosq_cs_connected and self._ping_t == 0:
+                self._send_pingreq()
+                self._msgtime_mutex.acquire()
+                self._last_msg_out = now
+                self._last_msg_in = now
+                self._msgtime_mutex.release()
+            else:
+                if self._ssl:
+                    self._ssl.close()
+                    self._ssl = None
+                elif self._sock:
+                    self._sock.close()
+                    self._sock = None
+
+                if self._state == mosq_cs_disconnecting:
+                    rc = MOSQ_ERR_SUCCESS
+                else:
+                    rc = 1
+                self._callback_mutex.acquire()
+                if self.on_disconnect:
+                    self._in_callback = True
+                    self.on_disconnect(self, self._userdata, rc)
+                    self._in_callback = False
+                self._callback_mutex.release()
+
+    def _mid_generate(self):
+        self._last_mid = self._last_mid + 1
+        if self._last_mid == 65536:
+            self._last_mid = 1
+        return self._last_mid
+
+    def _topic_wildcard_len_check(self, topic):
+        # Search for + or # in a topic. Return MOSQ_ERR_INVAL if found.
+         # Also returns MOSQ_ERR_INVAL if the topic string is too long.
+         # Returns MOSQ_ERR_SUCCESS if everything is fine.
+        if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535:
+            return MOSQ_ERR_INVAL
+        else:
+            return MOSQ_ERR_SUCCESS
+
+    def _send_pingreq(self):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGREQ")
+        rc = self._send_simple_command(PINGREQ)
+        if rc == MOSQ_ERR_SUCCESS:
+            self._ping_t = time.time()
+        return rc
+
+    def _send_pingresp(self):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGRESP")
+        return self._send_simple_command(PINGRESP)
+
+    def _send_puback(self, mid):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")")
+        return self._send_command_with_mid(PUBACK, mid, False)
+
+    def _send_pubcomp(self, mid):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")")
+        return self._send_command_with_mid(PUBCOMP, mid, False)
+
+    def _pack_remaining_length(self, packet, remaining_length):
+        remaining_bytes = []
+        while True:
+            byte = remaining_length % 128
+            remaining_length = remaining_length // 128
+            # If there are more digits to encode, set the top bit of this digit
+            if remaining_length > 0:
+                byte = byte | 0x80
+
+            remaining_bytes.append(byte)
+            packet.extend(struct.pack("!B", byte))
+            if remaining_length == 0:
+                # FIXME - this doesn't deal with incorrectly large payloads
+                return packet
+
+    def _pack_str16(self, packet, data):
+        if sys.version_info[0] < 3:
+            if isinstance(data, bytearray):
+                packet.extend(struct.pack("!H", len(data)))
+                packet.extend(data)
+            elif isinstance(data, str):
+                pack_format = "!H" + str(len(data)) + "s"
+                packet.extend(struct.pack(pack_format, len(data), data))
+            elif isinstance(data, unicode):
+                udata = data.encode('utf-8')
+                pack_format = "!H" + str(len(udata)) + "s"
+                packet.extend(struct.pack(pack_format, len(udata), udata))
+            else:
+                raise TypeError
+        else:
+            if isinstance(data, bytearray):
+                packet.extend(struct.pack("!H", len(data)))
+                packet.extend(data)
+            elif isinstance(data, str):
+                udata = data.encode('utf-8')
+                pack_format = "!H" + str(len(udata)) + "s"
+                packet.extend(struct.pack(pack_format, len(udata), udata))
+            else:
+                raise TypeError
+
+    def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
+        if self._sock == None and self._ssl == None:
+            return MOSQ_ERR_NO_CONN
+
+        command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
+        packet = bytearray()
+        packet.extend(struct.pack("!B", command))
+        if payload == None:
+            remaining_length = 2+len(topic)
+            self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)")
+        else:
+            remaining_length = 2+len(topic) + len(payload)
+            self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)")
+
+        if qos > 0:
+            # For message id
+            remaining_length = remaining_length + 2
+
+        self._pack_remaining_length(packet, remaining_length)
+        self._pack_str16(packet, topic)
+
+        if qos > 0:
+            # For message id
+            packet.extend(struct.pack("!H", mid))
+
+        if payload != None:
+            if isinstance(payload, str):
+                if sys.version_info[0] < 3:
+                    pack_format = str(len(payload)) + "s"
+                    packet.extend(struct.pack(pack_format, payload))
+                else:
+                    upayload = payload.encode('utf-8')
+                    pack_format = str(len(upayload)) + "s"
+                    packet.extend(struct.pack(pack_format, upayload))
+            elif isinstance(payload, bytearray):
+                packet.extend(payload)
+            elif isinstance(payload, unicode):
+                    upayload = payload.encode('utf-8')
+                    pack_format = str(len(upayload)) + "s"
+                    packet.extend(struct.pack(pack_format, upayload))
+            else:
+                raise TypeError('payload must be a string, unicode or a bytearray.')
+
+        return self._packet_queue(PUBLISH, packet, mid, qos)
+
+    def _send_pubrec(self, mid):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")")
+        return self._send_command_with_mid(PUBREC, mid, False)
+
+    def _send_pubrel(self, mid, dup=False):
+        self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")")
+        return self._send_command_with_mid(PUBREL|2, mid, dup)
+
+    def _send_command_with_mid(self, command, mid, dup):
+        # For PUBACK, PUBCOMP, PUBREC, and PUBREL
+        if dup:
+            command = command | 8
+
+        remaining_length = 2
+        packet = struct.pack('!BBH', command, remaining_length, mid)
+        return self._packet_queue(command, packet, mid, 1)
+
+    def _send_simple_command(self, command):
+        # For DISCONNECT, PINGREQ and PINGRESP
+        remaining_length = 0
+        packet = struct.pack('!BB', command, remaining_length)
+        return self._packet_queue(command, packet, 0, 0)
+
+    def _send_connect(self, keepalive, clean_session):
+        remaining_length = 12 + 2+len(self._client_id)
+        connect_flags = 0
+        if clean_session:
+            connect_flags = connect_flags | 0x02
+
+        if self._will:
+            remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload)
+            connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5)
+
+        if self._username:
+            remaining_length = remaining_length + 2+len(self._username)
+            connect_flags = connect_flags | 0x80
+            if self._password:
+                connect_flags = connect_flags | 0x40
+                remaining_length = remaining_length + 2+len(self._password)
+
+        command = CONNECT
+        packet = bytearray()
+        packet.extend(struct.pack("!B", command))
+        self._pack_remaining_length(packet, remaining_length)
+        packet.extend(struct.pack("!H6sBBH", len(PROTOCOL_NAME), PROTOCOL_NAME, PROTOCOL_VERSION, connect_flags, keepalive))
+
+        self._pack_str16(packet, self._client_id)
+
+        if self._will:
+            self._pack_str16(packet, self._will_topic)
+            if len(self._will_payload) > 0:
+                self._pack_str16(packet, self._will_payload)
+            else:
+                packet.extend(struct.pack("!H", 0))
+
+        if self._username:
+            self._pack_str16(packet, self._username)
+
+            if self._password:
+                self._pack_str16(packet, self._password)
+
+        self._keepalive = keepalive
+        return self._packet_queue(command, packet, 0, 0)
+
+    def _send_disconnect(self):
+        return self._send_simple_command(DISCONNECT)
+
+    def _send_subscribe(self, dup, topic, topic_qos):
+        remaining_length = 2 + 2+len(topic) + 1
+        command = SUBSCRIBE | (dup<<3) | (1<<1)
+        packet = bytearray()
+        packet.extend(struct.pack("!B", command))
+        self._pack_remaining_length(packet, remaining_length)
+        local_mid = self._mid_generate()
+        packet.extend(struct.pack("!H", local_mid))
+        self._pack_str16(packet, topic)
+        packet.extend(struct.pack("B", topic_qos))
+        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
+
+    def _send_unsubscribe(self, dup, topic):
+        remaining_length = 2 + 2+len(topic)
+        command = UNSUBSCRIBE | (dup<<3) | (1<<1)
+        packet = bytearray()
+        packet.extend(struct.pack("!B", command))
+        self._pack_remaining_length(packet, remaining_length)
+        local_mid = self._mid_generate()
+        packet.extend(struct.pack("!H", local_mid))
+        self._pack_str16(packet, topic)
+        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
+
+    def _message_update(self, mid, direction, state):
+        for m in self._messages:
+            if m.mid == mid and m.direction == direction:
+                m.state = state
+                m.timestamp = time.time()
+                return MOSQ_ERR_SUCCESS
+
+        return MOSQ_ERR_NOT_FOUND
+
+    def _message_retry_check(self):
+        now = time.time()
+        for m in self._messages:
+            if m.timestamp + self._message_retry < now:
+                if m.state == mosq_ms_wait_puback or m.state == mosq_ms_wait_pubrec:
+                    m.timestamp = now
+                    m.dup = True
+                    self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+                elif m.state == mosq_ms_wait_pubrel:
+                    m.timestamp = now
+                    m.dup = True
+                    self._send_pubrec(m.mid)
+                elif m.state == mosq_ms_wait_pubcomp:
+                    m.timestamp = now
+                    m.dup = True
+                    self._send_pubrel(m.mid, True)
+
+    def _messages_reconnect_reset(self):
+        for m in self._messages:
+            m.timestamp = 0
+            if m.direction == mosq_md_out:
+                if m.qos == 1:
+                    m.state = mosq_ms_wait_puback
+                elif m.qos == 2:
+                    m.state = mosq_ms_wait_pubrec
+            else:
+                self._messages.pop(self._messages.index(m))
+
+    def _packet_queue(self, command, packet, mid, qos):
+        mpkt = MosquittoPacket(command, packet, mid, qos)
+
+        self._out_packet_mutex.acquire()
+        self._out_packet.append(mpkt)
+        if self._current_out_packet_mutex.acquire(False):
+            if self._current_out_packet == None and len(self._out_packet) > 0:
+                self._current_out_packet = self._out_packet.pop(0)
+            self._current_out_packet_mutex.release()
+        self._out_packet_mutex.release()
+
+        if not self._in_callback and self._thread == None:
+            return self.loop_write()
+        else:
+            return MOSQ_ERR_SUCCESS
+
+    def _packet_handle(self):
+        cmd = self._in_packet.command&0xF0
+        if cmd == PINGREQ:
+            return self._handle_pingreq()
+        elif cmd == PINGRESP:
+            return self._handle_pingresp()
+        elif cmd == PUBACK:
+            return self._handle_pubackcomp("PUBACK")
+        elif cmd == PUBCOMP:
+            return self._handle_pubackcomp("PUBCOMP")
+        elif cmd == PUBLISH:
+            return self._handle_publish()
+        elif cmd == PUBREC:
+            return self._handle_pubrec()
+        elif cmd == PUBREL:
+            return self._handle_pubrel()
+        elif cmd == CONNACK:
+            return self._handle_connack()
+        elif cmd == SUBACK:
+            return self._handle_suback()
+        elif cmd == UNSUBACK:
+            return self._handle_unsuback()
+        else:
+            # If we don't recognise the command, return an error straight away.
+            self._easy_log(MOSQ_LOG_ERR, "Error: Unrecognised command "+str(cmd))
+            return MOSQ_ERR_PROTOCOL
+
+    def _handle_pingreq(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 0:
+                return MOSQ_ERR_PROTOCOL
+        
+        self._easy_log(MOSQ_LOG_DEBUG, "Received PINGREQ")
+        return self._send_pingresp()
+
+    def _handle_pingresp(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 0:
+                return MOSQ_ERR_PROTOCOL
+        
+        # No longer waiting for a PINGRESP.
+        self._ping_t = 0
+        self._easy_log(MOSQ_LOG_DEBUG, "Received PINGRESP")
+        return MOSQ_ERR_SUCCESS
+
+    def _handle_connack(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 2:
+                return MOSQ_ERR_PROTOCOL
+
+        if len(self._in_packet.packet) != 2:
+            return MOSQ_ERR_PROTOCOL
+
+        (resvd, result) = struct.unpack("!BB", self._in_packet.packet)
+        self._easy_log(MOSQ_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")")
+        self._callback_mutex.acquire()
+        if self.on_connect:
+            self._in_callback = True
+            self.on_connect(self, self._userdata, result)
+            self._in_callback = False
+        self._callback_mutex.release()
+        if result == 0:
+            self._state = mosq_cs_connected
+            return MOSQ_ERR_SUCCESS
+        elif result > 0 and result < 6:
+            return MOSQ_ERR_CONN_REFUSED
+        else:
+            return MOSQ_ERR_PROTOCOL
+
+    def _handle_suback(self):
+        self._easy_log(MOSQ_LOG_DEBUG, "Received SUBACK")
+        pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
+        (mid, packet) = struct.unpack(pack_format, self._in_packet.packet)
+        pack_format = "!" + "B"*len(packet)
+        granted_qos = struct.unpack(pack_format, packet)
+
+        self._callback_mutex.acquire()
+        if self.on_subscribe:
+            self._in_callback = True
+            self.on_subscribe(self, self._userdata, mid, granted_qos)
+            self._in_callback = False
+        self._callback_mutex.release()
+
+        return MOSQ_ERR_SUCCESS
+
+    def _handle_publish(self):
+        rc = 0
+
+        header = self._in_packet.command
+        message = MosquittoMessage()
+        message.direction = mosq_md_in
+        message.dup = (header & 0x08)>>3
+        message.qos = (header & 0x06)>>1
+        message.retain = (header & 0x01)
+
+        pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
+        (slen, packet) = struct.unpack(pack_format, self._in_packet.packet)
+        pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
+        (message.topic, packet) = struct.unpack(pack_format, packet)
+
+        if len(message.topic) == 0:
+            return MOSQ_ERR_PROTOCOL
+
+        if sys.version_info[0] >= 3:
+            message.topic = message.topic.decode('utf-8')
+        message.topic = _fix_sub_topic(message.topic)
+
+        if message.qos > 0:
+            pack_format = "!H" + str(len(packet)-2) + 's'
+            (message.mid, packet) = struct.unpack(pack_format, packet)
+
+        message.payload = packet
+
+        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+
+                ", q"+str(message.qos)+", r"+str(message.retain)+
+                ", m"+str(message.mid)+", '"+message.topic+
+                "', ...  ("+str(len(message.payload))+" bytes)")
+
+        message.timestamp = time.time()
+        if message.qos == 0:
+            self._callback_mutex.acquire()
+            if self.on_message:
+                self._in_callback = True
+                self.on_message(self, self._userdata, message)
+                self._in_callback = False
+
+            self._callback_mutex.release()
+            return MOSQ_ERR_SUCCESS
+        elif message.qos == 1:
+            rc = self._send_puback(message.mid)
+            self._callback_mutex.acquire()
+            if self.on_message:
+                self._in_callback = True
+                self.on_message(self, self._userdata, message)
+                self._in_callback = False
+
+            self._callback_mutex.release()
+            return rc
+        elif message.qos == 2:
+            rc = self._send_pubrec(message.mid)
+            message.state = mosq_ms_wait_pubrel
+            self._messages.append(message)
+            return rc
+        else:
+            return MOSQ_ERR_PROTOCOL
+
+    def _handle_pubrel(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 2:
+                return MOSQ_ERR_PROTOCOL
+        
+        if len(self._in_packet.packet) != 2:
+            return MOSQ_ERR_PROTOCOL
+
+        mid = struct.unpack("!H", self._in_packet.packet)
+        mid = mid[0]
+        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
+        
+        for i in range(len(self._messages)):
+            if self._messages[i].direction == mosq_md_in and self._messages[i].mid == mid:
+
+                # Only pass the message on if we have removed it from the queue - this
+                # prevents multiple callbacks for the same message.
+                self._callback_mutex.acquire()
+                if self.on_message:
+                    self._in_callback = True
+                    self.on_message(self, self._userdata, self._messages[i])
+                    self._in_callback = False
+                self._callback_mutex.release()
+                self._messages.pop(i)
+
+                return self._send_pubcomp(mid)
+
+        return MOSQ_ERR_SUCCESS
+
+    def _handle_pubrec(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 2:
+                return MOSQ_ERR_PROTOCOL
+        
+        mid = struct.unpack("!H", self._in_packet.packet)
+        mid = mid[0]
+        self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
+        
+        for i in range(len(self._messages)):
+            if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid:
+                self._messages[i].state = mosq_ms_wait_pubcomp
+                self._messages[i].timestamp = time.time()
+                return self._send_pubrel(mid, False)
+        
+        return MOSQ_ERR_SUCCESS
+
+    def _handle_unsuback(self):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 2:
+                return MOSQ_ERR_PROTOCOL
+        
+        mid = struct.unpack("!H", self._in_packet.packet)
+        mid = mid[0]
+        self._easy_log(MOSQ_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
+        self._callback_mutex.acquire()
+        if self.on_unsubscribe:
+            self._in_callback = True
+            self.on_unsubscribe(self, self._userdata, mid)
+            self._in_callback = False
+        self._callback_mutex.release()
+        return MOSQ_ERR_SUCCESS
+
+    def _handle_pubackcomp(self, cmd):
+        if self._strict_protocol:
+            if self._in_packet.remaining_length != 2:
+                return MOSQ_ERR_PROTOCOL
+        
+        mid = struct.unpack("!H", self._in_packet.packet)
+        mid = mid[0]
+        self._easy_log(MOSQ_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
+        
+        for i in range(len(self._messages)):
+            try:
+                if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid:
+                    # Only inform the client the message has been sent once.
+                    self._callback_mutex.acquire()
+                    if self.on_publish:
+                        self._in_callback = True
+                        self.on_publish(self, self._userdata, mid)
+                        self._in_callback = False
+
+                    self._callback_mutex.release()
+                    self._messages.pop(i)
+            except IndexError:
+                # Have removed item so i>count.
+                # Not really an error.
+                pass
+
+        return MOSQ_ERR_SUCCESS
+
+    def _thread_main(self):
+        run = True
+        self._thread_terminate = False
+        self._state_mutex.acquire()
+        if self._state == mosq_cs_connect_async:
+            self._state_mutex.release()
+            self.reconnect()
+        else:
+            self._state_mutex.release()
+
+        while run:
+            rc = MOSQ_ERR_SUCCESS
+            while rc == MOSQ_ERR_SUCCESS:
+                rc = self.loop()
+                if self._thread_terminate:
+                    rc = 1
+                    run = False
+
+            self._state_mutex.acquire()
+            if self._state == mosq_cs_disconnecting:
+                run = False
+                self._state_mutex.release()
+            else:
+                self._state_mutex.release()
+                time.sleep(1)
+                self.reconnect()
+