]> git.michaelhowe.org Git - packages/p/paho-mqtt.git/commitdiff
Add support for un/subscribing to multiple topics.
authorRoger Light <roger@atchoo.org>
Sat, 7 Dec 2013 22:47:19 +0000 (22:47 +0000)
committerRoger Light <roger@atchoo.org>
Mon, 3 Feb 2014 21:20:21 +0000 (21:20 +0000)
src/paho/mqtt/client.py

index 20bb20fc1b92bbdbaebeb9d07c69aaac7ca03ef6..4e564fa10ed7d2fa4af4c0ecc0649dbc53a58da2 100755 (executable)
@@ -853,51 +853,113 @@ class Client:
         return self._send_disconnect()
 
     def subscribe(self, topic, qos=0):
-        """Subscribe the client to a topic.
+        """Subscribe the client to one or more topics.
 
-        sub: The subscription topic to subscribe to.
+        This function may be called in three different ways:
+
+        Simple string and integer
+        -------------------------
+        e.g. subscribe("my/topic", 2)
+
+        topic: A string specifying the subscription topic to subscribe to.
         qos: The desired quality of service level for the subscription.
+             Defaults to 0.
+
+        String and integer tuple
+        ------------------------
+        e.g. subscribe(("my/topic", 1))
+
+        topic: A tuple of (topic, qos). Both topic and qos must be present in
+               the tuple.
+        qos: Not used.
+
+        List of string and integer tuples
+        ------------------------
+        e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
+
+        This allows multiple topic subscriptions in a single SUBSCRIPTION
+        command, which is more efficient than using multiple calls to
+        subscribe().
+
+        topic: A list of tuple of format (topic, qos). Both topic and qos must
+               be present in all of the tuples.
+        qos: Not used.
+
+        The function returns a tuple (result, mid), where result is
+        MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the client
+        is not currently connected.  mid is the message ID for the subscribe
+        request. The mid value can be used to track the subscribe request by
+        checking against the mid argument in the on_subscribe() callback if it
+        is defined.
 
-        Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
-        to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected.
-        mid is the message ID for the subscribe request. The mid value can be
-        used to track the subscribe request by checking against the mid
-        argument in the on_subscribe() callback if it is defined.
-        
         Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
-        zero string length.
+        zero string length, or if topic is not a string, tuple or list.
         """
-        if qos<0 or qos>2:
-            raise ValueError('Invalid QoS level.')
-        if topic == None or len(topic) == 0:
-            raise ValueError('Invalid topic.')
-        topic = _fix_sub_topic(topic)
-
-        if self._sock == None and self._ssl == None:
+        topic_qos_list = None
+        if isinstance(topic, str):
+            if qos<0 or qos>2:
+                raise ValueError('Invalid QoS level.')
+            if topic is None or len(topic) == 0:
+                raise ValueError('Invalid topic.')
+            topic_qos_list = [(topic, qos)]
+        elif isinstance(topic, tuple):
+            if topic[1]<0 or topic[1]>2:
+                raise ValueError('Invalid QoS level.')
+            if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str):
+                raise ValueError('Invalid topic.')
+            topic_qos_list = [topic]
+        elif isinstance(topic, list):
+            for t in topic:
+                if t[1]<0 or t[1]>2:
+                    raise ValueError('Invalid QoS level.')
+                if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str):
+                    raise ValueError('Invalid topic.')
+            topic_qos_list = topic
+
+        if topic_qos_list is None:
+            raise ValueError("No topic specified, or incorrect topic type.")
+
+        if self._sock is None and self._ssl is None:
             return MQTT_ERR_NO_CONN
 
-        return self._send_subscribe(False, topic, qos)
+        return self._send_subscribe(False, topic_qos_list)
 
     def unsubscribe(self, topic):
-        """Unsubscribe the client from a topic.
+        """Unsubscribe the client from one or more topics.
 
-        sub: The subscription topic to unsubscribe from.
+        topic: A single string, or list of strings that are the subscription
+               topics to unsubscribe from.
 
         Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
-        to indicate success or MQTT_ERR_NO_CONN if the client is not currently connected.
+        to indicate success or MQTT_ERR_NO_CONN if the client is not currently
+        connected.
         mid is the message ID for the unsubscribe request. The mid value can be
         used to track the unsubscribe request by checking against the mid
         argument in the on_unsubscribe() callback if it is defined.
 
-        Raises a ValueError if topic is None or has zero string length.
+        Raises a ValueError if topic is None or has zero string length, or is
+        not a string or list.
         """
-        if topic == None or len(topic) == 0:
+        topic_list = None
+        if topic is None:
             raise ValueError('Invalid topic.')
-        topic = _fix_sub_topic(topic)
-        if self._sock == None and self._ssl == None:
+        if isinstance(topic, str):
+            if len(topic) == 0:
+                raise ValueError('Invalid topic.')
+            topic_list = [topic]
+        elif isinstance(topic, list):
+            for t in topic:
+                if len(t) == 0 or not isinstance(t, str):
+                    raise ValueError('Invalid topic.')
+            topic_list = topic
+
+        if topic_list is None:
+            raise ValueError("No topic specified, or incorrect topic type.")
+
+        if self._sock is None and self._ssl is None:
             return MQTT_ERR_NO_CONN
 
-        return self._send_unsubscribe(False, topic)
+        return self._send_unsubscribe(False, topic_list)
 
     def loop_read(self, max_packets=1):
         """Process read network events. Use in place of calling loop() if you
@@ -1096,7 +1158,7 @@ class Client:
                 time.sleep(1)
 
                 self._state_mutex.acquire()
-                if self._state == mosq_cs_disconnecting:
+                if self._state == mqtt_cs_disconnecting:
                     run = False
                     self._state_mutex.release()
                 else:
@@ -1535,27 +1597,35 @@ class Client:
     def _send_disconnect(self):
         return self._send_simple_command(DISCONNECT)
 
-    def _send_subscribe(self, dup, topic, topic_qos):
-        remaining_length = 2 + 2+len(topic) + 1
+    def _send_subscribe(self, dup, topics):
+        remaining_length = 2
+        for t in topics:
+            remaining_length = remaining_length + 2+len(t[0])+1
+
         command = SUBSCRIBE | (dup<<3) | (1<<1)
         packet = bytearray()
         packet.extend(struct.pack("!B", command))
         self._pack_remaining_length(packet, remaining_length)
         local_mid = self._mid_generate()
         packet.extend(struct.pack("!H", local_mid))
-        self._pack_str16(packet, topic)
-        packet.extend(struct.pack("B", topic_qos))
+        for t in topics:
+            self._pack_str16(packet, t[0])
+            packet.extend(struct.pack("B", t[1]))
         return (self._packet_queue(command, packet, local_mid, 1), local_mid)
 
-    def _send_unsubscribe(self, dup, topic):
-        remaining_length = 2 + 2+len(topic)
+    def _send_unsubscribe(self, dup, topics):
+        remaining_length = 2
+        for t in topics:
+            remaining_length = remaining_length + 2+len(t)
+
         command = UNSUBSCRIBE | (dup<<3) | (1<<1)
         packet = bytearray()
         packet.extend(struct.pack("!B", command))
         self._pack_remaining_length(packet, remaining_length)
         local_mid = self._mid_generate()
         packet.extend(struct.pack("!H", local_mid))
-        self._pack_str16(packet, topic)
+        for t in topics:
+            self._pack_str16(packet, t)
         return (self._packet_queue(command, packet, local_mid, 1), local_mid)
 
     def _message_update(self, mid, direction, state):