#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
-# and Eclipse Distribution License v1.0 which accompany this distribution.
+# and Eclipse Distribution License v1.0 which accompany this distribution.
#
-# The Eclipse Public License is available at
+# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
-# and the Eclipse Distribution License is available at
+# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
PROTOCOL_VERSION = 3
-# Message types
+# Message types
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
+
def _fix_sub_topic(subtopic):
# Convert ////some////over/slashed///topic/etc/etc//
# into some/over/slashed/topic/etc/etc
else:
return '/'.join(filter(None, subtopic.split('/')))
+
def error_string(mqtt_errno):
"""Return the error string associated with an mqtt error number."""
if mqtt_errno == MQTT_ERR_SUCCESS:
else:
return "Unknown error."
+
def connack_string(connack_code):
"""Return the string associated with a CONNACK result."""
if connack_code == 0:
else:
return "Connection Refused: unknown reason."
+
def topic_matches_sub(sub, topic):
"""Check whether a topic matches a subscription.
-
+
For example:
-
+
foo/bar would match the subscription foo/# or +/bar
non/matching would not match the subscription non/+/+
"""
slen = len(local_sub)
tlen = len(local_topic)
- spos = 0;
- tpos = 0;
+ spos = 0
+ tpos = 0
while spos < slen and tpos < tlen:
if local_sub[spos] == local_topic[tpos]:
class MQTTMessage:
""" This is a class that describes an incoming message. It is passed to the
on_message callback as the message parameter.
-
+
Members:
topic : String. topic that the message was published on.
self.qos = 0
self.retain = False
+
class MQTTInPacket:
"""Internal datatype."""
def __init__(self):
def cleanup(self):
self.__init__()
+
class MQTTPacket:
"""Internal datatype."""
def __init__(self, command, packet, mid, qos):
self.to_process = len(packet)
self.packet = packet
+
class Client:
"""MQTT version 3.1 client class.
-
+
This is the main class for use communicating with an MQTT broker.
General usage flow:
A number of callback functions are available to receive data back from the
broker. To use a callback, define a function and then assign it to the
client:
-
+
def on_connect(client, userdata, rc):
print("Connection returned " + str(rc))
argument. "client" is the Client instance that is calling the callback.
"userdata" is user data of any type and can be set when creating a new client
instance or with user_data_set(userdata).
-
+
The callbacks:
on_connect(client, userdata, rc): called when the broker responds to our connection
the broker will remove all information about this client when it
disconnects. If False, the client is a persistent client and
subscription information and queued messages will be retained when the
- client disconnects.
+ client disconnects.
Note that a client will never discard its own outgoing messages on
disconnect. Calling connect() or reconnect() will cause the messages to
be resent. Use reinitialise() to reset a client to its original state.
parameter to callbacks. It may be updated at a later point with the
user_data_set() function.
"""
- if not clean_session and (client_id == "" or client_id == None):
+ if not clean_session and (client_id == "" or client_id is None):
raise ValueError('A client id must be provided if clean session is False.')
self._userdata = userdata
self._message_retry = 20
self._last_retry_check = 0
self._clean_session = clean_session
- if client_id == "" or client_id == None:
+ if client_id == "" or client_id is None:
self._client_id = "paho/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5))
else:
self._client_id = client_id
that if either of these files in encrypted and needs a password to
decrypt it, Python will ask for the password at the command line. It is
not currently possible to define a callback to provide the password.
-
+
cert_reqs allows the certificate requirements that the client imposes
on the broker to be changed. By default this is ssl.CERT_REQUIRED,
which means that the broker must provide a certificate. See the ssl
pydoc for more information on this parameter.
-
+
tls_version allows the version of the SSL/TLS protocol used to be
specified. By default TLS v1 is used. Previous versions (all versions
beginning with SSL) are possible but not recommended due to possible
more information.
Must be called before connect() or connect_async()."""
- if HAVE_SSL == False:
+ if HAVE_SSL is False:
raise ValueError('This platform has no SSL/TLS.')
if sys.version < '2.7':
raise ValueError('Python 2.7 is the minimum supported version for TLS.')
- if ca_certs == None:
+ if ca_certs is None:
raise ValueError('ca_certs must not be None.')
try:
raise IOError(ca_certs+": "+err.strerror)
else:
f.close()
- if certfile != None:
+ if certfile is not None:
try:
f = open(certfile, "r")
except IOError as err:
raise IOError(certfile+": "+err.strerror)
else:
f.close()
- if keyfile != None:
+ if keyfile is not None:
try:
f = open(keyfile, "r")
except IOError as err:
Do not use this function in a real system. Setting value to true means
there is no point using encryption.
-
+
Must be called before connect()."""
- if HAVE_SSL == False:
+ if HAVE_SSL is False:
raise ValueError('This platform has no SSL/TLS.')
self._tls_insecure = value
-
def connect(self, host, port=1883, keepalive=60, bind_address=""):
"""Connect to a remote broker.
keepalive and bind_address are as for connect()
"""
- if HAVE_DNS == False:
+ if HAVE_DNS is False:
raise ValueError('No DNS resolver library found.')
if domain is None:
broker. If no other messages are being exchanged, this controls the
rate at which the client will send ping messages to the broker.
"""
- if host == None or len(host) == 0:
+ if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
if keepalive < 0:
raise ValueError('Keepalive must be >=0.')
- if bind_address != "" and bind_address != None:
+ if bind_address != "" and bind_address is not None:
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
raise ValueError('bind_address requires Python 2.7 or 3.2.')
if msg.errno != errno.EINPROGRESS:
raise
- if self._tls_ca_certs != None:
- self._ssl = ssl.wrap_socket(self._sock,
- certfile=self._tls_certfile,
- keyfile=self._tls_keyfile,
- ca_certs=self._tls_ca_certs,
- cert_reqs=self._tls_cert_reqs,
- ssl_version=self._tls_version,
- ciphers=self._tls_ciphers)
-
- if self._tls_insecure == False:
+ if self._tls_ca_certs is not None:
+ self._ssl = ssl.wrap_socket(
+ self._sock,
+ certfile=self._tls_certfile,
+ keyfile=self._tls_keyfile,
+ ca_certs=self._tls_ca_certs,
+ cert_reqs=self._tls_cert_reqs,
+ ssl_version=self._tls_version,
+ ciphers=self._tls_ciphers)
+
+ if self._tls_insecure is False:
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
self._tls_match_hostname()
else:
ssl.match_hostname(self._ssl.getpeercert(), self._host)
-
+
self._sock.setblocking(0)
return self._send_connect(self._keepalive, self._clean_session)
messages with QoS>0.
timeout: The time in seconds to wait for incoming/outgoing network
- traffic before timing out and returning.
+ traffic before timing out and returning.
max_packets: Not currently used.
Returns MQTT_ERR_SUCCESS on success.
self._current_out_packet_mutex.acquire()
self._out_packet_mutex.acquire()
- if self._current_out_packet == None and len(self._out_packet) > 0:
+ if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
if self._current_out_packet:
if self.socket() in socklist[0]:
rc = self.loop_read(max_packets)
- if rc or (self._ssl == None and self._sock == None):
+ if rc or (self._ssl is None and self._sock is None):
return rc
if self.socket() in socklist[1]:
rc = self.loop_write(max_packets)
- if rc or (self._ssl == None and self._sock == None):
+ if rc or (self._ssl is None and self._sock is None):
return rc
return self.loop_misc()
value can be used to track the publish request by checking against the
mid argument in the on_publish() callback if it is defined.
- A ValueError will be raised if topic == None, has zero length or is
+ A ValueError will be raised if topic is None, has zero length or is
invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
the length of the payload is greater than 268435455 bytes."""
- if topic == None or len(topic) == 0:
+ if topic is None or len(topic) == 0:
raise ValueError('Invalid topic.')
if qos<0 or qos>2:
raise ValueError('Invalid QoS level.')
local_payload = payload
elif isinstance(payload, int) or isinstance(payload, float):
local_payload = str(payload)
- elif payload == None:
+ elif payload is None:
local_payload = None
else:
raise TypeError('payload must be a string, bytearray, int, float or None.')
- if local_payload != None and len(local_payload) > 268435455:
+ if local_payload is not None and len(local_payload) > 268435455:
raise ValueError('Payload too large.')
if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS:
message.mid = local_mid
message.topic = topic
- if local_payload == None or len(local_payload) == 0:
+ if local_payload is None or len(local_payload) == 0:
message.payload = None
else:
message.payload = local_payload
self._state = mqtt_cs_disconnecting
self._state_mutex.release()
- if self._sock == None and self._ssl == None:
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
return self._send_disconnect()
Use socket() to obtain the client socket to call select() or equivalent
on.
-
+
Do not use if you are using the threaded interface loop_start()."""
- if self._sock == None and self._ssl == None:
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._messages)
def loop_write(self, max_packets=1):
"""Process read network events. Use in place of calling loop() if you
wish to handle your client reads as part of your own application.
-
+
Use socket() to obtain the client socket to call select() or equivalent
on.
Use want_write() to determine if there is data waiting to be written.
Do not use if you are using the threaded interface loop_start()."""
- if self._sock == None and self._ssl == None:
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._messages)
wish to call select() or equivalent on.
Do not use if you are using the threaded interface loop_start()."""
- if self._sock == None and self._ssl == None:
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
now = time.time()
Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
zero string length.
"""
- if topic == None or len(topic) == 0:
+ if topic is None or len(topic) == 0:
raise ValueError('Invalid topic.')
if qos<0 or qos>2:
raise ValueError('Invalid QoS level.')
self._will_payload = payload
elif isinstance(payload, int) or isinstance(payload, float):
self._will_payload = str(payload)
- elif payload == None:
+ elif payload is None:
self._will_payload = None
else:
raise TypeError('payload must be a string, bytearray, int, float or None.')
def will_clear(self):
""" Removes a will that was previously configured with will_set().
-
+
Must be called before connect() to have any effect."""
self._will = False
self._will_topic = ""
rc = MQTT_ERR_SUCCESS
while rc == MQTT_ERR_SUCCESS:
rc = self.loop(timeout, max_packets)
- if self._thread_terminate == True:
+ if self._thread_terminate is True:
rc = 1
run = False
start a new thread to process network traffic. This provides an
alternative to repeatedly calling loop() yourself.
"""
- if self._thread != None:
+ if self._thread is not None:
return MQTT_ERR_INVAL
self._thread = threading.Thread(target=self._thread_main)
The force parameter is currently ignored.
"""
- if self._thread == None:
+ if self._thread is None:
return MQTT_ERR_INVAL
self._thread_terminate = True
self._in_packet.pos = 0
rc = self._packet_handle()
- # Free data and reset values
+ # Free data and reset values
self._in_packet.cleanup()
self._msgtime_mutex.acquire()
self._current_out_packet = None
self._out_packet_mutex.release()
else:
- pass # FIXME
-
+ pass # FIXME
+
self._current_out_packet_mutex.release()
self._msgtime_mutex.acquire()
last_msg_out = self._last_msg_out
last_msg_in = self._last_msg_in
self._msgtime_mutex.release()
- if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
+ if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
if self._state == mqtt_cs_connected and self._ping_t == 0:
self._send_pingreq()
self._msgtime_mutex.acquire()
raise TypeError
def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
- if self._sock == None and self._ssl == None:
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
packet = bytearray()
packet.extend(struct.pack("!B", command))
- if payload == None:
+ if payload is None:
remaining_length = 2+len(topic)
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)")
else:
# For message id
packet.extend(struct.pack("!H", mid))
- if payload != None:
+ if payload is not None:
if isinstance(payload, str):
if sys.version_info[0] < 3:
pack_format = str(len(payload)) + "s"
if self._will:
self._pack_str16(packet, self._will_topic)
- if self._will_payload == None or len(self._will_payload) == 0:
+ if self._will_payload is None or len(self._will_payload) == 0:
packet.extend(struct.pack("!H", 0))
else:
self._pack_str16(packet, self._will_payload)
self._out_packet_mutex.acquire()
self._out_packet.append(mpkt)
if self._current_out_packet_mutex.acquire(False):
- if self._current_out_packet == None and len(self._out_packet) > 0:
+ if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
self._current_out_packet_mutex.release()
self._out_packet_mutex.release()
- if not self._in_callback and self._thread == None:
+ if not self._in_callback and self._thread is None:
return self.loop_write()
else:
return MQTT_ERR_SUCCESS
if self._strict_protocol:
if self._in_packet.remaining_length != 0:
return MQTT_ERR_PROTOCOL
-
+
self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ")
return self._send_pingresp()
if self._strict_protocol:
if self._in_packet.remaining_length != 0:
return MQTT_ERR_PROTOCOL
-
+
# No longer waiting for a PINGRESP.
self._ping_t = 0
self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP")
message.payload = packet
- self._easy_log(MQTT_LOG_DEBUG, "Received PUBLISH (d"+str(message.dup)+
- ", q"+str(message.qos)+", r"+str(message.retain)+
- ", m"+str(message.mid)+", '"+message.topic+
- "', ... ("+str(len(message.payload))+" bytes)")
+ self._easy_log(
+ MQTT_LOG_DEBUG,
+ "Received PUBLISH (d"+str(message.dup)+
+ ", q"+str(message.qos)+", r"+str(message.retain)+
+ ", m"+str(message.mid)+", '"+message.topic+
+ "', ... ("+str(len(message.payload))+" bytes)")
message.timestamp = time.time()
if message.qos == 0:
if self._strict_protocol:
if self._in_packet.remaining_length != 2:
return MQTT_ERR_PROTOCOL
-
+
if len(self._in_packet.packet) != 2:
return MQTT_ERR_PROTOCOL
mid = struct.unpack("!H", self._in_packet.packet)
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
-
+
self._message_mutex.acquire()
for i in range(len(self._messages)):
if self._messages[i].direction == mqtt_md_in and self._messages[i].mid == mid:
if self._strict_protocol:
if self._in_packet.remaining_length != 2:
return MQTT_ERR_PROTOCOL
-
+
mid = struct.unpack("!H", self._in_packet.packet)
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
-
+
self._message_mutex.acquire()
for m in self._messages:
if m.direction == mqtt_md_out and m.mid == mid:
m.timestamp = time.time()
self._message_mutex.release()
return self._send_pubrel(mid, False)
-
+
self._message_mutex.release()
return MQTT_ERR_SUCCESS
if self._strict_protocol:
if self._in_packet.remaining_length != 2:
return MQTT_ERR_PROTOCOL
-
+
mid = struct.unpack("!H", self._in_packet.packet)
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
if self._strict_protocol:
if self._in_packet.remaining_length != 2:
return MQTT_ERR_PROTOCOL
-
+
mid = struct.unpack("!H", self._in_packet.packet)
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
-
+
self._message_mutex.acquire()
for i in range(len(self._messages)):
try:
san = cert.get('subjectAltName')
if san:
have_san_dns = False
- for ((key,value),) in san:
+ for ((key, value),) in san:
if key == 'DNS':
have_san_dns = True
if value.lower() == self._host.lower():
raise ssl.SSLError('Certificate subject does not match remote hostname.')
subject = cert.get('subject')
if subject:
- for ((key,value),) in subject:
+ for ((key, value),) in subject:
if key == 'commonName':
if value.lower() == self._host.lower():
return