on_message(client, userdata, message)
-Called when a message has been received on a topic that the client subscribes to.
+Called when a message has been received on a topic that the client subscribes
+to. This callback will be called for every message received. Use
+``message_callback_add()`` to define multiple callbacks that will be called for
+specific topic filters.
client
the client instance for this callback
mqttc.on_message = on_message
...
+message_callback_add()
+''''''''''''''''''''''
+
+This function allows you to define callbacks that handle incoming messages for specific subscription filters, including with wildcards. This lets you, for example, subscribe to ``sensors/#`` and have one callback to handle ``sensors/temperature`` and another to handle ``sensors/humidity``.
+
+::
+
+ message_callback_add(sub, callback)
+
+sub
+ the subscription filter to match against for this callback. Only one callback may be defined per literal sub string
+
+callback
+ the callback to be used. Takes the same form as the ``on_message`` callback.
+
+If using ``message_callback_add()`` and ``on_message``, only messages that do not match a subscription specific filter will be passed to the ``on_message`` callback.
+
+message_callback_remove()
+'''''''''''''''''''''''''
+
+Remove a topic/subscription specific callback previously registered using ``message_callback_add()``.
+
+::
+
+ message_callback_remove(sub)
+
+sub
+ the subscription filter to remove
+
on_publish()
''''''''''''
--- /dev/null
+#!/usr/bin/python
+
+# Copyright (c) 2014 Roger Light <roger@atchoo.org>
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Distribution License v1.0
+# which accompanies this distribution.
+#
+# The Eclipse Distribution License is available at
+# http://www.eclipse.org/org/documents/edl-v10.php.
+#
+# Contributors:
+# Roger Light - initial implementation
+# All rights reserved.
+
+# This shows a simple example of an MQTT subscriber using a per-subscription message handler.
+
+import sys
+try:
+ import paho.mqtt.client as mqtt
+except ImportError:
+ # This part is only required to run the example from within the examples
+ # directory when the module itself is not installed.
+ #
+ # If you have the module installed, just use "import paho.mqtt.client"
+ import os
+ import inspect
+ cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src")))
+ if cmd_subfolder not in sys.path:
+ sys.path.insert(0, cmd_subfolder)
+ import paho.mqtt.client as mqtt
+
+def on_message_msgs(mosq, obj, msg):
+ # This callback will only be called for messages with topics that match
+ # $SYS/broker/messages/#
+ print("MESSAGES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+def on_message_bytes(mosq, obj, msg):
+ # This callback will only be called for messages with topics that match
+ # $SYS/broker/bytes/#
+ print("BYTES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+def on_message(mosq, obj, msg):
+ # This callback will be called for messages that we receive that do not
+ # match any patterns defined in topic specific callbacks, i.e. in this case
+ # those messages that do not have topics $SYS/broker/messages/# nor
+ # $SYS/broker/bytes/#
+ print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
+
+mqttc = mqtt.Client()
+
+# Add message callbacks that will only trigger on a specific subscription match.
+mqttc.message_callback_add("$SYS/broker/messages/#", on_message_msgs)
+mqttc.message_callback_add("$SYS/broker/bytes/#", on_message_bytes)
+mqttc.on_message = on_message
+mqttc.connect("iot.eclipse.org", 1883, 60)
+mqttc.subscribe("$SYS/#", 0)
+
+mqttc.loop_forever()
+
self.on_connect = None
self.on_publish = None
self.on_message = None
+ self.on_message_filtered = []
self.on_subscribe = None
self.on_unsubscribe = None
self.on_log = None
self._thread.join()
self._thread = None
+ def message_callback_add(self, sub, callback):
+ """Register a message callback for a specific topic.
+ Messages that match 'sub' will be passed to 'callback'. Any
+ non-matching messages will be passed to the default on_message
+ callback.
+
+ Call multiple times with different 'sub' to define multiple topic
+ specific callbacks.
+
+ Topic specific callbacks may be removed with
+ message_callback_remove()."""
+ if callback is None or sub is None:
+ raise ValueError("sub and callback must both be defined.")
+
+ self._callback_mutex.acquire()
+
+ for i in range(0, len(self.on_message_filtered)):
+ if self.on_message_filtered[i][0] == sub:
+ self.on_message_filtered[i] = (sub, callback)
+ self._callback_mutex.release()
+ return
+
+ self.on_message_filtered.append((sub, callback))
+ self._callback_mutex.release()
+
+ def message_callback_remove(self, sub):
+ """Remove a message callback previously registered with
+ message_callback_add()."""
+ if sub is None:
+ raise ValueError("sub must defined.")
+
+ self._callback_mutex.acquire()
+ for i in range(0, len(self.on_message_filtered)):
+ if self.on_message_filtered[i][0] == sub:
+ self.on_message_filtered.pop(i)
+ self._callback_mutex.release()
+ return
+ self._callback_mutex.release()
+
# ============================================================
# Private functions
# ============================================================
def _handle_on_message(self, message):
self._callback_mutex.acquire()
- if self.on_message:
+ matched = False
+ for t in self.on_message_filtered:
+ if topic_matches_sub(t[0], message.topic):
+ self._in_callback = True
+ t[1](self, self._userdata, message)
+ self._in_callback = False
+ matched = True
+
+ if matched == False and self.on_message:
self._in_callback = True
self.on_message(self, self._userdata, message)
self._in_callback = False