From dafb61280f0d395ce99fe7b67ace8bdb84beea0e Mon Sep 17 00:00:00 2001 From: Roger Light Date: Sun, 16 Jun 2013 22:55:44 +0100 Subject: [PATCH] Rename src, add required submodule files and fix example. --- examples/sub.py | 21 +- src/paho/__init__.py | 0 src/paho/mqtt/__init__.py | 0 src/paho/mqtt/{mosquitto.py => client.py} | 396 +++++++++++----------- 4 files changed, 216 insertions(+), 201 deletions(-) create mode 100644 src/paho/__init__.py create mode 100644 src/paho/mqtt/__init__.py rename src/paho/mqtt/{mosquitto.py => client.py} (87%) diff --git a/examples/sub.py b/examples/sub.py index 7af946d..f013247 100755 --- a/examples/sub.py +++ b/examples/sub.py @@ -27,7 +27,18 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import mosquitto +# ========================================================================== +# This part is only required to run the example from within the examples +# directory when the module itself is not installed. +import sys +import os +import inspect +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src"))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) +# ========================================================================== + +import paho.mqtt.client as mqtt def on_connect(mosq, obj, rc): print("rc: "+str(rc)) @@ -44,14 +55,18 @@ def on_subscribe(mosq, obj, mid, granted_qos): def on_log(mosq, obj, level, string): print(string) -mqttc = mosquitto.Mosquitto() +# If you want to use a specific client id, use +# mqttc = mosquitto.Mosquitto("client-id") +# but note that the client id must be unique on the broker. Leaving the client +# id parameter empty will generate a random id for you. +mqttc = mqtt.Client() mqttc.on_message = on_message mqttc.on_connect = on_connect mqttc.on_publish = on_publish mqttc.on_subscribe = on_subscribe # Uncomment to enable debug messages #mqttc.on_log = on_log -mqttc.connect("test.mosquitto.org", 1883, 60) +mqttc.connect("m2m.eclipse.org", 1883, 60) mqttc.subscribe("$SYS/#", 0) diff --git a/src/paho/__init__.py b/src/paho/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/paho/mqtt/__init__.py b/src/paho/mqtt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/paho/mqtt/mosquitto.py b/src/paho/mqtt/client.py similarity index 87% rename from src/paho/mqtt/mosquitto.py rename to src/paho/mqtt/client.py index 86f6ce9..7955bdf 100755 --- a/src/paho/mqtt/mosquitto.py +++ b/src/paho/mqtt/client.py @@ -70,11 +70,11 @@ PINGRESP = 0xD0 DISCONNECT = 0xE0 # Log levels -MOSQ_LOG_INFO = 0x01 -MOSQ_LOG_NOTICE = 0x02 -MOSQ_LOG_WARNING = 0x04 -MOSQ_LOG_ERR = 0x08 -MOSQ_LOG_DEBUG = 0x10 +MQTT_LOG_INFO = 0x01 +MQTT_LOG_NOTICE = 0x02 +MQTT_LOG_WARNING = 0x04 +MQTT_LOG_ERR = 0x08 +MQTT_LOG_DEBUG = 0x10 # CONNACK codes CONNACK_ACCEPTED = 0 @@ -85,40 +85,40 @@ CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 CONNACK_REFUSED_NOT_AUTHORIZED = 5 # Connection state -mosq_cs_new = 0 -mosq_cs_connected = 1 -mosq_cs_disconnecting = 2 -mosq_cs_connect_async = 3 +mqtt_cs_new = 0 +mqtt_cs_connected = 1 +mqtt_cs_disconnecting = 2 +mqtt_cs_connect_async = 3 # Message direction -mosq_md_invalid = 0 -mosq_md_in = 1 -mosq_md_out = 2 +mqtt_md_invalid = 0 +mqtt_md_in = 1 +mqtt_md_out = 2 # Message state -mosq_ms_invalid = 0, -mosq_ms_wait_puback = 1 -mosq_ms_wait_pubrec = 2 -mosq_ms_wait_pubrel = 3 -mosq_ms_wait_pubcomp = 4 +mqtt_ms_invalid = 0, +mqtt_ms_wait_puback = 1 +mqtt_ms_wait_pubrec = 2 +mqtt_ms_wait_pubrel = 3 +mqtt_ms_wait_pubcomp = 4 # Error values -MOSQ_ERR_AGAIN = -1 -MOSQ_ERR_SUCCESS = 0 -MOSQ_ERR_NOMEM = 1 -MOSQ_ERR_PROTOCOL = 2 -MOSQ_ERR_INVAL = 3 -MOSQ_ERR_NO_CONN = 4 -MOSQ_ERR_CONN_REFUSED = 5 -MOSQ_ERR_NOT_FOUND = 6 -MOSQ_ERR_CONN_LOST = 7 -MOSQ_ERR_TLS = 8 -MOSQ_ERR_PAYLOAD_SIZE = 9 -MOSQ_ERR_NOT_SUPPORTED = 10 -MOSQ_ERR_AUTH = 11 -MOSQ_ERR_ACL_DENIED = 12 -MOSQ_ERR_UNKNOWN = 13 -MOSQ_ERR_ERRNO = 14 +MQTT_ERR_AGAIN = -1 +MQTT_ERR_SUCCESS = 0 +MQTT_ERR_NOMEM = 1 +MQTT_ERR_PROTOCOL = 2 +MQTT_ERR_INVAL = 3 +MQTT_ERR_NO_CONN = 4 +MQTT_ERR_CONN_REFUSED = 5 +MQTT_ERR_NOT_FOUND = 6 +MQTT_ERR_CONN_LOST = 7 +MQTT_ERR_TLS = 8 +MQTT_ERR_PAYLOAD_SIZE = 9 +MQTT_ERR_NOT_SUPPORTED = 10 +MQTT_ERR_AUTH = 11 +MQTT_ERR_ACL_DENIED = 12 +MQTT_ERR_UNKNOWN = 13 +MQTT_ERR_ERRNO = 14 def _fix_sub_topic(subtopic): # Convert ////some////over/slashed///topic/etc/etc// @@ -128,37 +128,37 @@ def _fix_sub_topic(subtopic): else: return '/'.join(filter(None, subtopic.split('/'))) -def error_string(mosq_errno): - """Return the error string associated with a mosquitto error number.""" - if mosq_errno == MOSQ_ERR_SUCCESS: +def error_string(mqtt_errno): + """Return the error string associated with an mqtt error number.""" + if mqtt_errno == MQTT_ERR_SUCCESS: return "No error." - elif mosq_errno == MOSQ_ERR_NOMEM: + elif mqtt_errno == MQTT_ERR_NOMEM: return "Out of memory." - elif mosq_errno == MOSQ_ERR_PROTOCOL: + elif mqtt_errno == MQTT_ERR_PROTOCOL: return "A network protocol error occurred when communicating with the broker." - elif mosq_errno == MOSQ_ERR_INVAL: + elif mqtt_errno == MQTT_ERR_INVAL: return "Invalid function arguments provided." - elif mosq_errno == MOSQ_ERR_NO_CONN: + elif mqtt_errno == MQTT_ERR_NO_CONN: return "The client is not currently connected." - elif mosq_errno == MOSQ_ERR_CONN_REFUSED: + elif mqtt_errno == MQTT_ERR_CONN_REFUSED: return "The connection was refused." - elif mosq_errno == MOSQ_ERR_NOT_FOUND: + elif mqtt_errno == MQTT_ERR_NOT_FOUND: return "Message not found (internal error)." - elif mosq_errno == MOSQ_ERR_CONN_LOST: + elif mqtt_errno == MQTT_ERR_CONN_LOST: return "The connection was lost." - elif mosq_errno == MOSQ_ERR_TLS: + elif mqtt_errno == MQTT_ERR_TLS: return "A TLS error occurred." - elif mosq_errno == MOSQ_ERR_PAYLOAD_SIZE: + elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE: return "Payload too large." - elif mosq_errno == MOSQ_ERR_NOT_SUPPORTED: + elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED: return "This feature is not supported." - elif mosq_errno == MOSQ_ERR_AUTH: + elif mqtt_errno == MQTT_ERR_AUTH: return "Authorisation failed." - elif mosq_errno == MOSQ_ERR_ACL_DENIED: + elif mqtt_errno == MQTT_ERR_ACL_DENIED: return "Access denied by ACL." - elif mosq_errno == MOSQ_ERR_UNKNOWN: + elif mqtt_errno == MQTT_ERR_UNKNOWN: return "Unknown error." - elif mosq_errno == MOSQ_ERR_ERRNO: + elif mqtt_errno == MQTT_ERR_ERRNO: return "Error defined by errno." else: return "Unknown error." @@ -238,7 +238,7 @@ def topic_matches_sub(sub, topic): return result -class MosquittoMessage: +class MQTTMessage: """ This is a class that describes an incoming message. It is passed to the on_message callback as the message parameter. @@ -252,8 +252,8 @@ class MosquittoMessage: """ def __init__(self): self.timestamp = 0 - self.direction = mosq_md_invalid - self.state = mosq_ms_invalid + self.direction = mqtt_md_invalid + self.state = mqtt_ms_invalid self.dup = False self.mid = 0 self.topic = "" @@ -261,7 +261,7 @@ class MosquittoMessage: self.qos = 0 self.retain = False -class MosquittoInPacket: +class MQTTInPacket: """Internal datatype.""" def __init__(self): self.command = 0 @@ -276,7 +276,7 @@ class MosquittoInPacket: def cleanup(self): self.__init__() -class MosquittoPacket: +class MQTTPacket: """Internal datatype.""" def __init__(self, command, packet, mid, qos): self.command = command @@ -286,7 +286,7 @@ class MosquittoPacket: self.to_process = len(packet) self.packet = packet -class Mosquitto: +class Client: """MQTT version 3.1 client class. This is the main class for use communicating with an MQTT broker. @@ -312,19 +312,19 @@ class Mosquitto: broker. To use a callback, define a function and then assign it to the client: - def on_connect(mosq, userdata, rc): + def on_connect(client, userdata, rc): print("Connection returned " + str(rc)) client.on_connect = on_connect - All of the callbacks as described below have a "mosq" and an "userdata" - argument. "mosq" is the Mosquitto instance that is calling the callback. + All of the callbacks as described below have a "client" and an "userdata" + argument. "client" is the Client instance that is calling the callback. "userdata" is user data of any type and can be set when creating a new client instance or with user_data_set(userdata). The callbacks: - on_connect(mosq, userdata, rc): called when the broker responds to our connection + on_connect(client, userdata, rc): called when the broker responds to our connection request. The value of rc determines success or not: 0: Connection successful 1: Connection refused - incorrect protocol version @@ -334,17 +334,17 @@ class Mosquitto: 5: Connection refused - not authorised 6-255: Currently unused. - on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker. - The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS + on_disconnect(client, userdata, rc): called when the client disconnects from the broker. + The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS (0), the callback was called in response to a disconnect() call. If any other value the disconnection was unexpected, such as might be caused by a network error. - on_message(mosq, userdata, message): called when a message has been received on a + on_message(client, userdata, message): called when a message has been received on a topic that the client subscribes to. The message variable is a - MosquittoMessage that describes all of the message parameters. + MQTTMessage that describes all of the message parameters. - on_publish(mosq, userdata, mid): called when a message that was to be sent using the + on_publish(client, userdata, mid): called when a message that was to be sent using the publish() call has completed transmission to the broker. For messages with QoS levels 1 and 2, this means that the appropriate handshakes have completed. For QoS 0, this simply means that the message has left the @@ -353,20 +353,20 @@ class Mosquitto: This callback is important because even if the publish() call returns success, it does not always mean that the message has been sent. - on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a + on_subscribe(client, userdata, mid, granted_qos): called when the broker responds to a subscribe request. The mid variable matches the mid variable returned from the corresponding subscribe() call. The granted_qos variable is a list of integers that give the QoS level the broker has granted for each of the different subscription requests. - on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe + on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe request. The mid variable matches the mid variable returned from the corresponding unsubscribe() call. - on_log(mosq, userdata, level, buf): called when the client has log information. Define + on_log(client, userdata, level, buf): called when the client has log information. Define to allow debugging. The level variable gives the severity of the message - and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING, - MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf. + and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, + MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. """ def __init__(self, client_id="", clean_session=True, userdata=None): @@ -398,20 +398,20 @@ class Mosquitto: self._last_retry_check = 0 self._clean_session = clean_session if client_id == "": - self._client_id = "mosq/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) + self._client_id = "paho/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) else: self._client_id = client_id self._username = "" self._password = "" - self._in_packet = MosquittoInPacket() + self._in_packet = MQTTInPacket() self._out_packet = [] self._current_out_packet = None self._last_msg_in = time.time() self._last_msg_out = time.time() self._ping_t = 0 self._last_mid = 0 - self._state = mosq_cs_new + self._state = mqtt_cs_new self._messages = [] self._will = False self._will_topic = "" @@ -564,7 +564,7 @@ class Mosquitto: self._keepalive = keepalive self._state_mutex.acquire() - self._state = mosq_cs_connect_async + self._state = mqtt_cs_connect_async self._state_mutex.release() def reconnect(self): @@ -591,7 +591,7 @@ class Mosquitto: self._ping_t = 0 self._state_mutex.acquire() - self._state = mosq_cs_new + self._state = mqtt_cs_new self._state_mutex.release() if self._ssl: self._ssl.close() @@ -645,7 +645,7 @@ class Mosquitto: traffic before timing out and returning. max_packets: Not currently used. - Returns MOSQ_ERR_SUCCESS on success. + Returns MQTT_ERR_SUCCESS on success. Returns >0 on error. A ValueError will be raised if timeout < 0""" @@ -669,7 +669,7 @@ class Mosquitto: socklist = select.select(rlist, wlist, [], timeout) except TypeError: # Socket isn't correct type, in likelihood connection is lost - return MOSQ_ERR_CONN_LOST + return MQTT_ERR_CONN_LOST if self.socket() in socklist[0]: rc = self.loop_read(max_packets) @@ -699,8 +699,8 @@ class Mosquitto: retain: If set to true, the message will be set as the "last known good"/retained message for the topic. - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS to - indicate success or MOSQ_ERR_NO_CONN if the client is not currently + Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to + indicate success or MQTT_ERR_NO_CONN if the client is not currently connected. mid is the message ID for the publish request. The mid value can be used to track the publish request by checking against the mid argument in the on_publish() callback if it is defined. @@ -724,7 +724,7 @@ class Mosquitto: if local_payload != None and len(local_payload) > 268435455: raise ValueError('Payload too large.') - if self._topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS: + if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS: raise ValueError('Publish topic cannot contain wildcards.') local_mid = self._mid_generate() @@ -733,13 +733,13 @@ class Mosquitto: rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) return (rc, local_mid) else: - message = MosquittoMessage() + message = MQTTMessage() message.timestamp = time.time() - message.direction = mosq_md_out + message.direction = mqtt_md_out if qos == 1: - message.state = mosq_ms_wait_puback + message.state = mqtt_ms_wait_puback elif qos == 2: - message.state = mosq_ms_wait_pubrec + message.state = mqtt_ms_wait_pubrec message.mid = local_mid message.topic = topic @@ -771,10 +771,10 @@ class Mosquitto: def disconnect(self): """Disconnect a connected client from the broker.""" if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN self._state_mutex.acquire() - self._state = mosq_cs_disconnecting + self._state = mqtt_cs_disconnecting self._state_mutex.release() return self._send_disconnect() @@ -785,8 +785,8 @@ class Mosquitto: sub: The subscription topic to subscribe to. qos: The desired quality of service level for the subscription. - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS - to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. + Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS + to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected. mid is the message ID for the subscribe request. The mid value can be used to track the subscribe request by checking against the mid argument in the on_subscribe() callback if it is defined. @@ -801,7 +801,7 @@ class Mosquitto: topic = _fix_sub_topic(topic) if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN return self._send_subscribe(False, topic, qos) @@ -810,8 +810,8 @@ class Mosquitto: sub: The subscription topic to unsubscribe from. - Returns a tuple (result, mid), where result is MOSQ_ERR_SUCCESS - to indicate success or MOSQ_ERR_NO_CONN if the client is not currently connected. + Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS + to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected. mid is the message ID for the unsubscribe request. The mid value can be used to track the unsubscribe request by checking against the mid argument in the on_unsubscribe() callback if it is defined. @@ -822,7 +822,7 @@ class Mosquitto: raise ValueError('Invalid topic.') topic = _fix_sub_topic(topic) if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN return self._send_unsubscribe(False, topic) @@ -835,7 +835,7 @@ class Mosquitto: Do not use if you are using the threaded interface loop_start().""" if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN max_packets = len(self._messages) if max_packets < 1: @@ -845,9 +845,9 @@ class Mosquitto: rc = self._packet_read() if rc > 0: return self._loop_rc_handle(rc) - elif rc == MOSQ_ERR_AGAIN: - return MOSQ_ERR_SUCCESS - return MOSQ_ERR_SUCCESS + elif rc == MQTT_ERR_AGAIN: + return MQTT_ERR_SUCCESS + return MQTT_ERR_SUCCESS def loop_write(self, max_packets=1): """Process read network events. Use in place of calling loop() if you @@ -860,7 +860,7 @@ class Mosquitto: Do not use if you are using the threaded interface loop_start().""" if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN max_packets = len(self._messages) if max_packets < 1: @@ -870,9 +870,9 @@ class Mosquitto: rc = self._packet_write() if rc > 0: return self._loop_rc_handle(rc) - elif rc == MOSQ_ERR_AGAIN: - return MOSQ_ERR_SUCCESS - return MOSQ_ERR_SUCCESS + elif rc == MQTT_ERR_AGAIN: + return MQTT_ERR_SUCCESS + return MQTT_ERR_SUCCESS def want_write(self): """Call to determine if there is network data waiting to be written. @@ -889,7 +889,7 @@ class Mosquitto: Do not use if you are using the threaded interface loop_start().""" if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN now = time.time() self._check_keepalive() @@ -899,7 +899,7 @@ class Mosquitto: self._last_retry_check = now if self._ping_t > 0 and now - self._ping_t >= self._keepalive: - # mosq->ping_t != 0 means we are waiting for a pingresp. + # client->ping_t != 0 means we are waiting for a pingresp. # This hasn't happened in the keepalive time so we should disconnect. if self._ssl: self._ssl.close() @@ -909,8 +909,8 @@ class Mosquitto: self._sock = None self._callback_mutex.acquire() - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS else: rc = 1 if self.on_disconnect: @@ -918,9 +918,9 @@ class Mosquitto: self.on_disconnect(self, self._userdata, rc) self._in_callback = False self._callback_mutex.release() - return MOSQ_ERR_CONN_LOST + return MQTT_ERR_CONN_LOST - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def message_retry_set(self, retry): """Set the timeout in seconds before a message with QoS>0 is retried. @@ -996,15 +996,15 @@ class Mosquitto: disconnect() in a callback it will return.""" run = True - if self._state == mosq_cs_connect_async: + if self._state == mqtt_cs_connect_async: self.reconnect() while run: - rc = MOSQ_ERR_SUCCESS - while rc == MOSQ_ERR_SUCCESS: + rc = MQTT_ERR_SUCCESS + while rc == MQTT_ERR_SUCCESS: rc = self.loop(timeout, max_packets) - if self._state == mosq_cs_disconnecting: + if self._state == mqtt_cs_disconnecting: run = False else: time.sleep(1) @@ -1017,7 +1017,7 @@ class Mosquitto: alternative to repeatedly calling loop() yourself. """ if self._thread != None: - return MOSQ_ERR_INVAL + return MQTT_ERR_INVAL self._thread = threading.Thread(target=self._thread_main) self._thread.daemon = True @@ -1031,7 +1031,7 @@ class Mosquitto: The force parameter is currently ignored. """ if self._thread == None: - return MOSQ_ERR_INVAL + return MQTT_ERR_INVAL self._thread_terminate = True self._thread.join() @@ -1051,8 +1051,8 @@ class Mosquitto: self._sock = None self._state_mutex.acquire() - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS self._state_mutex.release() self._callback_mutex.acquire() if self.on_disconnect: @@ -1075,7 +1075,7 @@ class Mosquitto: # Then try to read the remaining payload, where 'payload' here means the # combined variable header and actual payload. This is the most likely to # fail due to longer length, so save current data and current position. - # After all data is read, send to _mosquitto_handle_packet() to deal with. + # After all data is read, send to _mqtt_handle_packet() to deal with. # Finally, free the memory and reset everything to starting conditions. if self._in_packet.command == 0: try: @@ -1086,9 +1086,9 @@ class Mosquitto: except socket.error as err: (msg) = err if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN print(msg) return 1 else: @@ -1110,9 +1110,9 @@ class Mosquitto: except socket.error as err: (msg) = err if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN print(msg) return 1 else: @@ -1122,7 +1122,7 @@ class Mosquitto: # Max 4 bytes length for remaining length as defined by protocol. # Anything more likely means a broken/malicious client. if len(self._in_packet.remaining_count) > 4: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128 @@ -1142,9 +1142,9 @@ class Mosquitto: except socket.error as err: (msg) = err if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN print(msg) return 1 else: @@ -1176,14 +1176,14 @@ class Mosquitto: write_length = self._sock.send(packet.packet[packet.pos:]) except AttributeError: self._current_out_packet_mutex.release() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS except socket.error as err: self._current_out_packet_mutex.release() (msg) = err if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE): - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN if msg.errno == errno.EAGAIN: - return MOSQ_ERR_AGAIN + return MQTT_ERR_AGAIN print(msg) return 1 @@ -1216,7 +1216,7 @@ class Mosquitto: self._last_msg_out = time.time() self._msgtime_mutex.release() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _easy_log(self, level, buf): if self.on_log: @@ -1229,7 +1229,7 @@ class Mosquitto: last_msg_in = self._last_msg_in self._msgtime_mutex.release() if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): - if self._state == mosq_cs_connected and self._ping_t == 0: + if self._state == mqtt_cs_connected and self._ping_t == 0: self._send_pingreq() self._msgtime_mutex.acquire() self._last_msg_out = now @@ -1243,8 +1243,8 @@ class Mosquitto: self._sock.close() self._sock = None - if self._state == mosq_cs_disconnecting: - rc = MOSQ_ERR_SUCCESS + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS else: rc = 1 self._callback_mutex.acquire() @@ -1261,31 +1261,31 @@ class Mosquitto: return self._last_mid def _topic_wildcard_len_check(self, topic): - # Search for + or # in a topic. Return MOSQ_ERR_INVAL if found. - # Also returns MOSQ_ERR_INVAL if the topic string is too long. - # Returns MOSQ_ERR_SUCCESS if everything is fine. + # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. + # Also returns MQTT_ERR_INVAL if the topic string is too long. + # Returns MQTT_ERR_SUCCESS if everything is fine. if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: - return MOSQ_ERR_INVAL + return MQTT_ERR_INVAL else: - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _send_pingreq(self): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGREQ") + self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") rc = self._send_simple_command(PINGREQ) - if rc == MOSQ_ERR_SUCCESS: + if rc == MQTT_ERR_SUCCESS: self._ping_t = time.time() return rc def _send_pingresp(self): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PINGRESP") + self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP") return self._send_simple_command(PINGRESP) def _send_puback(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") return self._send_command_with_mid(PUBACK, mid, False) def _send_pubcomp(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") return self._send_command_with_mid(PUBCOMP, mid, False) def _pack_remaining_length(self, packet, remaining_length): @@ -1330,17 +1330,17 @@ class Mosquitto: def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): if self._sock == None and self._ssl == None: - return MOSQ_ERR_NO_CONN + return MQTT_ERR_NO_CONN command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain packet = bytearray() packet.extend(struct.pack("!B", command)) if payload == None: remaining_length = 2+len(topic) - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"' (NULL payload)") else: remaining_length = 2+len(topic) + len(payload) - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(retain)+", m"+str(mid)+", '"+topic+"', ... ("+str(len(payload))+" bytes)") if qos > 0: # For message id @@ -1374,11 +1374,11 @@ class Mosquitto: return self._packet_queue(PUBLISH, packet, mid, qos) def _send_pubrec(self, mid): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") return self._send_command_with_mid(PUBREC, mid, False) def _send_pubrel(self, mid, dup=False): - self._easy_log(MOSQ_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") return self._send_command_with_mid(PUBREL|2, mid, dup) def _send_command_with_mid(self, command, mid, dup): @@ -1468,23 +1468,23 @@ class Mosquitto: if m.mid == mid and m.direction == direction: m.state = state m.timestamp = time.time() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS - return MOSQ_ERR_NOT_FOUND + return MQTT_ERR_NOT_FOUND def _message_retry_check(self): now = time.time() for m in self._messages: if m.timestamp + self._message_retry < now: - if m.state == mosq_ms_wait_puback or m.state == mosq_ms_wait_pubrec: + if m.state == mqtt_ms_wait_puback or m.state == mqtt_ms_wait_pubrec: m.timestamp = now m.dup = True self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) - elif m.state == mosq_ms_wait_pubrel: + elif m.state == mqtt_ms_wait_pubrel: m.timestamp = now m.dup = True self._send_pubrec(m.mid) - elif m.state == mosq_ms_wait_pubcomp: + elif m.state == mqtt_ms_wait_pubcomp: m.timestamp = now m.dup = True self._send_pubrel(m.mid, True) @@ -1492,16 +1492,16 @@ class Mosquitto: def _messages_reconnect_reset(self): for m in self._messages: m.timestamp = 0 - if m.direction == mosq_md_out: + if m.direction == mqtt_md_out: if m.qos == 1: - m.state = mosq_ms_wait_puback + m.state = mqtt_ms_wait_puback elif m.qos == 2: - m.state = mosq_ms_wait_pubrec + m.state = mqtt_ms_wait_pubrec else: self._messages.pop(self._messages.index(m)) def _packet_queue(self, command, packet, mid, qos): - mpkt = MosquittoPacket(command, packet, mid, qos) + mpkt = MQTTPacket(command, packet, mid, qos) self._out_packet_mutex.acquire() self._out_packet.append(mpkt) @@ -1514,7 +1514,7 @@ class Mosquitto: if not self._in_callback and self._thread == None: return self.loop_write() else: - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _packet_handle(self): cmd = self._in_packet.command&0xF0 @@ -1540,37 +1540,37 @@ class Mosquitto: return self._handle_unsuback() else: # If we don't recognise the command, return an error straight away. - self._easy_log(MOSQ_LOG_ERR, "Error: Unrecognised command "+str(cmd)) - return MOSQ_ERR_PROTOCOL + self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd)) + return MQTT_ERR_PROTOCOL def _handle_pingreq(self): if self._strict_protocol: if self._in_packet.remaining_length != 0: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL - self._easy_log(MOSQ_LOG_DEBUG, "Received PINGREQ") + self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ") return self._send_pingresp() def _handle_pingresp(self): if self._strict_protocol: if self._in_packet.remaining_length != 0: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL # No longer waiting for a PINGRESP. self._ping_t = 0 - self._easy_log(MOSQ_LOG_DEBUG, "Received PINGRESP") - return MOSQ_ERR_SUCCESS + self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP") + return MQTT_ERR_SUCCESS def _handle_connack(self): if self._strict_protocol: if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL if len(self._in_packet.packet) != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL (resvd, result) = struct.unpack("!BB", self._in_packet.packet) - self._easy_log(MOSQ_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")") + self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")") self._callback_mutex.acquire() if self.on_connect: self._in_callback = True @@ -1578,15 +1578,15 @@ class Mosquitto: self._in_callback = False self._callback_mutex.release() if result == 0: - self._state = mosq_cs_connected - return MOSQ_ERR_SUCCESS + self._state = mqtt_cs_connected + return MQTT_ERR_SUCCESS elif result > 0 and result < 6: - return MOSQ_ERR_CONN_REFUSED + return MQTT_ERR_CONN_REFUSED else: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL def _handle_suback(self): - self._easy_log(MOSQ_LOG_DEBUG, "Received SUBACK") + self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's' (mid, packet) = struct.unpack(pack_format, self._in_packet.packet) pack_format = "!" + "B"*len(packet) @@ -1599,14 +1599,14 @@ class Mosquitto: self._in_callback = False self._callback_mutex.release() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _handle_publish(self): rc = 0 header = self._in_packet.command - message = MosquittoMessage() - message.direction = mosq_md_in + message = MQTTMessage() + message.direction = mqtt_md_in message.dup = (header & 0x08)>>3 message.qos = (header & 0x06)>>1 message.retain = (header & 0x01) @@ -1617,7 +1617,7 @@ class Mosquitto: (message.topic, packet) = struct.unpack(pack_format, packet) if len(message.topic) == 0: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL if sys.version_info[0] >= 3: message.topic = message.topic.decode('utf-8') @@ -1629,7 +1629,7 @@ class Mosquitto: message.payload = packet - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+ + self._easy_log(MQTT_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+ ", q"+str(message.qos)+", r"+str(message.retain)+ ", m"+str(message.mid)+", '"+message.topic+ "', ... ("+str(len(message.payload))+" bytes)") @@ -1643,7 +1643,7 @@ class Mosquitto: self._in_callback = False self._callback_mutex.release() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS elif message.qos == 1: rc = self._send_puback(message.mid) self._callback_mutex.acquire() @@ -1656,26 +1656,26 @@ class Mosquitto: return rc elif message.qos == 2: rc = self._send_pubrec(message.mid) - message.state = mosq_ms_wait_pubrel + message.state = mqtt_ms_wait_pubrel self._messages.append(message) return rc else: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL def _handle_pubrel(self): if self._strict_protocol: if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL if len(self._in_packet.packet) != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL mid = struct.unpack("!H", self._in_packet.packet) mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") for i in range(len(self._messages)): - if self._messages[i].direction == mosq_md_in and self._messages[i].mid == mid: + if self._messages[i].direction == mqtt_md_in and self._messages[i].mid == mid: # Only pass the message on if we have removed it from the queue - this # prevents multiple callbacks for the same message. @@ -1689,53 +1689,53 @@ class Mosquitto: return self._send_pubcomp(mid) - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _handle_pubrec(self): if self._strict_protocol: if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL mid = struct.unpack("!H", self._in_packet.packet) mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") for i in range(len(self._messages)): - if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: - self._messages[i].state = mosq_ms_wait_pubcomp + if self._messages[i].direction == mqtt_md_out and self._messages[i].mid == mid: + self._messages[i].state = mqtt_ms_wait_pubcomp self._messages[i].timestamp = time.time() return self._send_pubrel(mid, False) - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _handle_unsuback(self): if self._strict_protocol: if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL mid = struct.unpack("!H", self._in_packet.packet) mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") self._callback_mutex.acquire() if self.on_unsubscribe: self._in_callback = True self.on_unsubscribe(self, self._userdata, mid) self._in_callback = False self._callback_mutex.release() - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _handle_pubackcomp(self, cmd): if self._strict_protocol: if self._in_packet.remaining_length != 2: - return MOSQ_ERR_PROTOCOL + return MQTT_ERR_PROTOCOL mid = struct.unpack("!H", self._in_packet.packet) mid = mid[0] - self._easy_log(MOSQ_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") + self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") for i in range(len(self._messages)): try: - if self._messages[i].direction == mosq_md_out and self._messages[i].mid == mid: + if self._messages[i].direction == mqtt_md_out and self._messages[i].mid == mid: # Only inform the client the message has been sent once. self._callback_mutex.acquire() if self.on_publish: @@ -1750,28 +1750,28 @@ class Mosquitto: # Not really an error. pass - return MOSQ_ERR_SUCCESS + return MQTT_ERR_SUCCESS def _thread_main(self): run = True self._thread_terminate = False self._state_mutex.acquire() - if self._state == mosq_cs_connect_async: + if self._state == mqtt_cs_connect_async: self._state_mutex.release() self.reconnect() else: self._state_mutex.release() while run: - rc = MOSQ_ERR_SUCCESS - while rc == MOSQ_ERR_SUCCESS: + rc = MQTT_ERR_SUCCESS + while rc == MQTT_ERR_SUCCESS: rc = self.loop() if self._thread_terminate: rc = 1 run = False self._state_mutex.acquire() - if self._state == mosq_cs_disconnecting: + if self._state == mqtt_cs_disconnecting: run = False self._state_mutex.release() else: -- 2.39.5