return self._send_disconnect()
def subscribe(self, topic, qos=0):
- """Subscribe the client to a topic.
+ """Subscribe the client to one or more topics.
- sub: The subscription topic to subscribe to.
+ This function may be called in three different ways:
+
+ Simple string and integer
+ -------------------------
+ e.g. subscribe("my/topic", 2)
+
+ topic: A string specifying the subscription topic to subscribe to.
qos: The desired quality of service level for the subscription.
+ Defaults to 0.
+
+ String and integer tuple
+ ------------------------
+ e.g. subscribe(("my/topic", 1))
+
+ topic: A tuple of (topic, qos). Both topic and qos must be present in
+ the tuple.
+ qos: Not used.
+
+ List of string and integer tuples
+ ------------------------
+ e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
+
+ This allows multiple topic subscriptions in a single SUBSCRIPTION
+ command, which is more efficient than using multiple calls to
+ subscribe().
+
+ topic: A list of tuple of format (topic, qos). Both topic and qos must
+ be present in all of the tuples.
+ qos: Not used.
+
+ The function returns a tuple (result, mid), where result is
+ MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the client
+ is not currently connected. mid is the message ID for the subscribe
+ request. The mid value can be used to track the subscribe request by
+ checking against the mid argument in the on_subscribe() callback if it
+ is defined.
- Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
- to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected.
- mid is the message ID for the subscribe request. The mid value can be
- used to track the subscribe request by checking against the mid
- argument in the on_subscribe() callback if it is defined.
-
Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
- zero string length.
+ zero string length, or if topic is not a string, tuple or list.
"""
- if qos<0 or qos>2:
- raise ValueError('Invalid QoS level.')
- if topic == None or len(topic) == 0:
- raise ValueError('Invalid topic.')
- topic = _fix_sub_topic(topic)
-
- if self._sock == None and self._ssl == None:
+ topic_qos_list = None
+ if isinstance(topic, str):
+ if qos<0 or qos>2:
+ raise ValueError('Invalid QoS level.')
+ if topic is None or len(topic) == 0:
+ raise ValueError('Invalid topic.')
+ topic_qos_list = [(topic, qos)]
+ elif isinstance(topic, tuple):
+ if topic[1]<0 or topic[1]>2:
+ raise ValueError('Invalid QoS level.')
+ if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str):
+ raise ValueError('Invalid topic.')
+ topic_qos_list = [topic]
+ elif isinstance(topic, list):
+ for t in topic:
+ if t[1]<0 or t[1]>2:
+ raise ValueError('Invalid QoS level.')
+ if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str):
+ raise ValueError('Invalid topic.')
+ topic_qos_list = topic
+
+ if topic_qos_list is None:
+ raise ValueError("No topic specified, or incorrect topic type.")
+
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
- return self._send_subscribe(False, topic, qos)
+ return self._send_subscribe(False, topic_qos_list)
def unsubscribe(self, topic):
- """Unsubscribe the client from a topic.
+ """Unsubscribe the client from one or more topics.
- sub: The subscription topic to unsubscribe from.
+ topic: A single string, or list of strings that are the subscription
+ topics to unsubscribe from.
Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
- to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected.
+ to indicate success or MQTT_ERR_NO_CONN if the client is not currently
+ connected.
mid is the message ID for the unsubscribe request. The mid value can be
used to track the unsubscribe request by checking against the mid
argument in the on_unsubscribe() callback if it is defined.
- Raises a ValueError if topic is None or has zero string length.
+ Raises a ValueError if topic is None or has zero string length, or is
+ not a string or list.
"""
- if topic == None or len(topic) == 0:
+ topic_list = None
+ if topic is None:
raise ValueError('Invalid topic.')
- topic = _fix_sub_topic(topic)
- if self._sock == None and self._ssl == None:
+ if isinstance(topic, str):
+ if len(topic) == 0:
+ raise ValueError('Invalid topic.')
+ topic_list = [topic]
+ elif isinstance(topic, list):
+ for t in topic:
+ if len(t) == 0 or not isinstance(t, str):
+ raise ValueError('Invalid topic.')
+ topic_list = topic
+
+ if topic_list is None:
+ raise ValueError("No topic specified, or incorrect topic type.")
+
+ if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
- return self._send_unsubscribe(False, topic)
+ return self._send_unsubscribe(False, topic_list)
def loop_read(self, max_packets=1):
"""Process read network events. Use in place of calling loop() if you
time.sleep(1)
self._state_mutex.acquire()
- if self._state == mosq_cs_disconnecting:
+ if self._state == mqtt_cs_disconnecting:
run = False
self._state_mutex.release()
else:
def _send_disconnect(self):
return self._send_simple_command(DISCONNECT)
- def _send_subscribe(self, dup, topic, topic_qos):
- remaining_length = 2 + 2+len(topic) + 1
+ def _send_subscribe(self, dup, topics):
+ remaining_length = 2
+ for t in topics:
+ remaining_length = remaining_length + 2+len(t[0])+1
+
command = SUBSCRIBE | (dup<<3) | (1<<1)
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
local_mid = self._mid_generate()
packet.extend(struct.pack("!H", local_mid))
- self._pack_str16(packet, topic)
- packet.extend(struct.pack("B", topic_qos))
+ for t in topics:
+ self._pack_str16(packet, t[0])
+ packet.extend(struct.pack("B", t[1]))
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
- def _send_unsubscribe(self, dup, topic):
- remaining_length = 2 + 2+len(topic)
+ def _send_unsubscribe(self, dup, topics):
+ remaining_length = 2
+ for t in topics:
+ remaining_length = remaining_length + 2+len(t)
+
command = UNSUBSCRIBE | (dup<<3) | (1<<1)
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
local_mid = self._mid_generate()
packet.extend(struct.pack("!H", local_mid))
- self._pack_str16(packet, topic)
+ for t in topics:
+ self._pack_str16(packet, t)
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
def _message_update(self, mid, direction, state):