From 1c9b45fd046b83e46b484e82fca2dbb141d9cff5 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Sat, 7 Dec 2013 22:47:19 +0000 Subject: [PATCH] Add support for un/subscribing to multiple topics. --- src/paho/mqtt/client.py | 136 ++++++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 33 deletions(-) diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 20bb20f..4e564fa 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -853,51 +853,113 @@ class Client: return self._send_disconnect() def subscribe(self, topic, qos=0): - """Subscribe the client to a topic. + """Subscribe the client to one or more topics. - sub: The subscription topic to subscribe to. + This function may be called in three different ways: + + Simple string and integer + ------------------------- + e.g. subscribe("my/topic", 2) + + topic: A string specifying the subscription topic to subscribe to. qos: The desired quality of service level for the subscription. + Defaults to 0. + + String and integer tuple + ------------------------ + e.g. subscribe(("my/topic", 1)) + + topic: A tuple of (topic, qos). Both topic and qos must be present in + the tuple. + qos: Not used. + + List of string and integer tuples + ------------------------ + e.g. subscribe([("my/topic", 0), ("another/topic", 2)]) + + This allows multiple topic subscriptions in a single SUBSCRIPTION + command, which is more efficient than using multiple calls to + subscribe(). + + topic: A list of tuple of format (topic, qos). Both topic and qos must + be present in all of the tuples. + qos: Not used. + + The function returns a tuple (result, mid), where result is + MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the client + is not currently connected. mid is the message ID for the subscribe + request. The mid value can be used to track the subscribe request by + checking against the mid argument in the on_subscribe() callback if it + is defined. - Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS - to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected. - mid is the message ID for the subscribe request. The mid value can be - used to track the subscribe request by checking against the mid - argument in the on_subscribe() callback if it is defined. - Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has - zero string length. + zero string length, or if topic is not a string, tuple or list. """ - if qos<0 or qos>2: - raise ValueError('Invalid QoS level.') - if topic == None or len(topic) == 0: - raise ValueError('Invalid topic.') - topic = _fix_sub_topic(topic) - - if self._sock == None and self._ssl == None: + topic_qos_list = None + if isinstance(topic, str): + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if topic is None or len(topic) == 0: + raise ValueError('Invalid topic.') + topic_qos_list = [(topic, qos)] + elif isinstance(topic, tuple): + if topic[1]<0 or topic[1]>2: + raise ValueError('Invalid QoS level.') + if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str): + raise ValueError('Invalid topic.') + topic_qos_list = [topic] + elif isinstance(topic, list): + for t in topic: + if t[1]<0 or t[1]>2: + raise ValueError('Invalid QoS level.') + if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str): + raise ValueError('Invalid topic.') + topic_qos_list = topic + + if topic_qos_list is None: + raise ValueError("No topic specified, or incorrect topic type.") + + if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN - return self._send_subscribe(False, topic, qos) + return self._send_subscribe(False, topic_qos_list) def unsubscribe(self, topic): - """Unsubscribe the client from a topic. + """Unsubscribe the client from one or more topics. - sub: The subscription topic to unsubscribe from. + topic: A single string, or list of strings that are the subscription + topics to unsubscribe from. Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS - to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected. + to indicate success or MQTT_ERR_NO_CONN if the client is not currently + connected. mid is the message ID for the unsubscribe request. The mid value can be used to track the unsubscribe request by checking against the mid argument in the on_unsubscribe() callback if it is defined. - Raises a ValueError if topic is None or has zero string length. + Raises a ValueError if topic is None or has zero string length, or is + not a string or list. """ - if topic == None or len(topic) == 0: + topic_list = None + if topic is None: raise ValueError('Invalid topic.') - topic = _fix_sub_topic(topic) - if self._sock == None and self._ssl == None: + if isinstance(topic, str): + if len(topic) == 0: + raise ValueError('Invalid topic.') + topic_list = [topic] + elif isinstance(topic, list): + for t in topic: + if len(t) == 0 or not isinstance(t, str): + raise ValueError('Invalid topic.') + topic_list = topic + + if topic_list is None: + raise ValueError("No topic specified, or incorrect topic type.") + + if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN - return self._send_unsubscribe(False, topic) + return self._send_unsubscribe(False, topic_list) def loop_read(self, max_packets=1): """Process read network events. Use in place of calling loop() if you @@ -1096,7 +1158,7 @@ class Client: time.sleep(1) self._state_mutex.acquire() - if self._state == mosq_cs_disconnecting: + if self._state == mqtt_cs_disconnecting: run = False self._state_mutex.release() else: @@ -1535,27 +1597,35 @@ class Client: def _send_disconnect(self): return self._send_simple_command(DISCONNECT) - def _send_subscribe(self, dup, topic, topic_qos): - remaining_length = 2 + 2+len(topic) + 1 + def _send_subscribe(self, dup, topics): + remaining_length = 2 + for t in topics: + remaining_length = remaining_length + 2+len(t[0])+1 + command = SUBSCRIBE | (dup<<3) | (1<<1) packet = bytearray() packet.extend(struct.pack("!B", command)) self._pack_remaining_length(packet, remaining_length) local_mid = self._mid_generate() packet.extend(struct.pack("!H", local_mid)) - self._pack_str16(packet, topic) - packet.extend(struct.pack("B", topic_qos)) + for t in topics: + self._pack_str16(packet, t[0]) + packet.extend(struct.pack("B", t[1])) return (self._packet_queue(command, packet, local_mid, 1), local_mid) - def _send_unsubscribe(self, dup, topic): - remaining_length = 2 + 2+len(topic) + def _send_unsubscribe(self, dup, topics): + remaining_length = 2 + for t in topics: + remaining_length = remaining_length + 2+len(t) + command = UNSUBSCRIBE | (dup<<3) | (1<<1) packet = bytearray() packet.extend(struct.pack("!B", command)) self._pack_remaining_length(packet, remaining_length) local_mid = self._mid_generate() packet.extend(struct.pack("!H", local_mid)) - self._pack_str16(packet, topic) + for t in topics: + self._pack_str16(packet, t) return (self._packet_queue(command, packet, local_mid, 1), local_mid) def _message_update(self, mid, direction, state): -- 2.39.5