]> git.michaelhowe.org Git - packages/p/paho-mqtt.git/commitdiff
Implement multiple callbacks for incoming messages.
authorRoger Light <roger@atchoo.org>
Wed, 30 Apr 2014 22:35:23 +0000 (23:35 +0100)
committerRoger Light <roger@atchoo.org>
Wed, 30 Apr 2014 22:35:23 +0000 (23:35 +0100)
Change-Id: I340301dab8f9171fbb209b71e360ac0d3b114048

README.rst
examples/sub-multiple-callback.py [new file with mode: 0755]
src/paho/mqtt/client.py

index cc844597fca8b13fe0532c0433184c87a22c12cd..75b57832025e6c7c3c0b4a0426038a99a7c5ff08 100644 (file)
@@ -639,7 +639,10 @@ on_message()
 
     on_message(client, userdata, message)
     
-Called when a message has been received on a topic that the client subscribes to.
+Called when a message has been received on a topic that the client subscribes
+to. This callback will be called for every message received. Use
+``message_callback_add()`` to define multiple callbacks that will be called for
+specific topic filters.
 
 client
     the client instance for this callback
@@ -662,6 +665,35 @@ Example
     mqttc.on_message = on_message
     ...
 
+message_callback_add()
+''''''''''''''''''''''
+
+This function allows you to define callbacks that handle incoming messages for specific subscription filters, including with wildcards. This lets you, for example, subscribe to ``sensors/#`` and have one callback to handle ``sensors/temperature`` and another to handle ``sensors/humidity``.
+
+::
+
+    message_callback_add(sub, callback)
+
+sub
+    the subscription filter to match against for this callback. Only one callback may be defined per literal sub string
+
+callback
+    the callback to be used. Takes the same form as the ``on_message`` callback.
+
+If using ``message_callback_add()`` and ``on_message``, only messages that do not match a subscription specific filter will be passed to the ``on_message`` callback.
+
+message_callback_remove()
+'''''''''''''''''''''''''
+
+Remove a topic/subscription specific callback previously registered using ``message_callback_add()``.
+
+::
+
+    message_callback_remove(sub)
+
+sub
+    the subscription filter to remove
+
 on_publish()
 ''''''''''''
 
diff --git a/examples/sub-multiple-callback.py b/examples/sub-multiple-callback.py
new file mode 100755 (executable)
index 0000000..4377f0c
--- /dev/null
@@ -0,0 +1,60 @@
+#!/usr/bin/python
+
+# Copyright (c) 2014 Roger Light <roger@atchoo.org>
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Distribution License v1.0
+# which accompanies this distribution. 
+#
+# The Eclipse Distribution License is available at 
+#   http://www.eclipse.org/org/documents/edl-v10.php.
+#
+# Contributors:
+#    Roger Light - initial implementation
+# All rights reserved.
+
+# This shows a simple example of an MQTT subscriber using a per-subscription message handler.
+
+import sys
+try:
+    import paho.mqtt.client as mqtt
+except ImportError:
+    # This part is only required to run the example from within the examples
+    # directory when the module itself is not installed.
+    #
+    # If you have the module installed, just use "import paho.mqtt.client"
+    import os
+    import inspect
+    cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src")))
+    if cmd_subfolder not in sys.path:
+        sys.path.insert(0, cmd_subfolder)
+    import paho.mqtt.client as mqtt
+
+def on_message_msgs(mosq, obj, msg):
+    # This callback will only be called for messages with topics that match
+    # $SYS/broker/messages/#
+    print("MESSAGES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+def on_message_bytes(mosq, obj, msg):
+    # This callback will only be called for messages with topics that match
+    # $SYS/broker/bytes/#
+    print("BYTES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+def on_message(mosq, obj, msg):
+    # This callback will be called for messages that we receive that do not
+    # match any patterns defined in topic specific callbacks, i.e. in this case
+    # those messages that do not have topics $SYS/broker/messages/# nor
+    # $SYS/broker/bytes/#
+    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+mqttc = mqtt.Client()
+
+# Add message callbacks that will only trigger on a specific subscription match.
+mqttc.message_callback_add("$SYS/broker/messages/#", on_message_msgs)
+mqttc.message_callback_add("$SYS/broker/bytes/#", on_message_bytes)
+mqttc.on_message = on_message
+mqttc.connect("iot.eclipse.org", 1883, 60)
+mqttc.subscribe("$SYS/#", 0)
+
+mqttc.loop_forever()
+
index 12c646aea828f58fbe0162def70975b972b11e79..39d9b26cd293bbc4888ba71f582c45e661b91cad 100755 (executable)
@@ -441,6 +441,7 @@ class Client(object):
         self.on_connect = None
         self.on_publish = None
         self.on_message = None
+        self.on_message_filtered = []
         self.on_subscribe = None
         self.on_unsubscribe = None
         self.on_log = None
@@ -1258,6 +1259,45 @@ class Client(object):
         self._thread.join()
         self._thread = None
 
+    def message_callback_add(self, sub, callback):
+        """Register a message callback for a specific topic.
+        Messages that match 'sub' will be passed to 'callback'. Any
+        non-matching messages will be passed to the default on_message
+        callback.
+        
+        Call multiple times with different 'sub' to define multiple topic
+        specific callbacks.
+        
+        Topic specific callbacks may be removed with
+        message_callback_remove()."""
+        if callback is None or sub is None:
+            raise ValueError("sub and callback must both be defined.")
+
+        self._callback_mutex.acquire()
+
+        for i in range(0, len(self.on_message_filtered)):
+            if self.on_message_filtered[i][0] == sub:
+                self.on_message_filtered[i] = (sub, callback)
+                self._callback_mutex.release()
+                return
+
+        self.on_message_filtered.append((sub, callback))
+        self._callback_mutex.release()
+
+    def message_callback_remove(self, sub):
+        """Remove a message callback previously registered with
+        message_callback_add()."""
+        if sub is None:
+            raise ValueError("sub must defined.")
+
+        self._callback_mutex.acquire()
+        for i in range(0, len(self.on_message_filtered)):
+            if self.on_message_filtered[i][0] == sub:
+                self.on_message_filtered.pop(i)
+                self._callback_mutex.release()
+                return
+        self._callback_mutex.release()
+
     # ============================================================
     # Private functions
     # ============================================================
@@ -2075,7 +2115,15 @@ class Client(object):
 
     def _handle_on_message(self, message):
         self._callback_mutex.acquire()
-        if self.on_message:
+        matched = False
+        for t in self.on_message_filtered:
+            if topic_matches_sub(t[0], message.topic):
+                self._in_callback = True
+                t[1](self, self._userdata, message)
+                self._in_callback = False
+                matched = True
+
+        if matched == False and self.on_message:
             self._in_callback = True
             self.on_message(self, self._userdata, message)
             self._in_callback = False