From 0ce5c5b60a8359de654d833bc11f172e6aa77b7b Mon Sep 17 00:00:00 2001 From: Roger Light Date: Wed, 30 Apr 2014 23:35:23 +0100 Subject: [PATCH] Implement multiple callbacks for incoming messages. Change-Id: I340301dab8f9171fbb209b71e360ac0d3b114048 --- README.rst | 34 +++++++++++++++++- examples/sub-multiple-callback.py | 60 +++++++++++++++++++++++++++++++ src/paho/mqtt/client.py | 50 +++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100755 examples/sub-multiple-callback.py diff --git a/README.rst b/README.rst index cc84459..75b5783 100644 --- a/README.rst +++ b/README.rst @@ -639,7 +639,10 @@ on_message() on_message(client, userdata, message) -Called when a message has been received on a topic that the client subscribes to. +Called when a message has been received on a topic that the client subscribes +to. This callback will be called for every message received. Use +``message_callback_add()`` to define multiple callbacks that will be called for +specific topic filters. client the client instance for this callback @@ -662,6 +665,35 @@ Example mqttc.on_message = on_message ... +message_callback_add() +'''''''''''''''''''''' + +This function allows you to define callbacks that handle incoming messages for specific subscription filters, including with wildcards. This lets you, for example, subscribe to ``sensors/#`` and have one callback to handle ``sensors/temperature`` and another to handle ``sensors/humidity``. + +:: + + message_callback_add(sub, callback) + +sub + the subscription filter to match against for this callback. Only one callback may be defined per literal sub string + +callback + the callback to be used. Takes the same form as the ``on_message`` callback. + +If using ``message_callback_add()`` and ``on_message``, only messages that do not match a subscription specific filter will be passed to the ``on_message`` callback. + +message_callback_remove() +''''''''''''''''''''''''' + +Remove a topic/subscription specific callback previously registered using ``message_callback_add()``. + +:: + + message_callback_remove(sub) + +sub + the subscription filter to remove + on_publish() '''''''''''' diff --git a/examples/sub-multiple-callback.py b/examples/sub-multiple-callback.py new file mode 100755 index 0000000..4377f0c --- /dev/null +++ b/examples/sub-multiple-callback.py @@ -0,0 +1,60 @@ +#!/usr/bin/python + +# Copyright (c) 2014 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Distribution License v1.0 +# which accompanies this distribution. +# +# The Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial implementation +# All rights reserved. + +# This shows a simple example of an MQTT subscriber using a per-subscription message handler. + +import sys +try: + import paho.mqtt.client as mqtt +except ImportError: + # This part is only required to run the example from within the examples + # directory when the module itself is not installed. + # + # If you have the module installed, just use "import paho.mqtt.client" + import os + import inspect + cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src"))) + if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + import paho.mqtt.client as mqtt + +def on_message_msgs(mosq, obj, msg): + # This callback will only be called for messages with topics that match + # $SYS/broker/messages/# + print("MESSAGES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +def on_message_bytes(mosq, obj, msg): + # This callback will only be called for messages with topics that match + # $SYS/broker/bytes/# + print("BYTES: "+msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +def on_message(mosq, obj, msg): + # This callback will be called for messages that we receive that do not + # match any patterns defined in topic specific callbacks, i.e. in this case + # those messages that do not have topics $SYS/broker/messages/# nor + # $SYS/broker/bytes/# + print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +mqttc = mqtt.Client() + +# Add message callbacks that will only trigger on a specific subscription match. +mqttc.message_callback_add("$SYS/broker/messages/#", on_message_msgs) +mqttc.message_callback_add("$SYS/broker/bytes/#", on_message_bytes) +mqttc.on_message = on_message +mqttc.connect("iot.eclipse.org", 1883, 60) +mqttc.subscribe("$SYS/#", 0) + +mqttc.loop_forever() + diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 12c646a..39d9b26 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -441,6 +441,7 @@ class Client(object): self.on_connect = None self.on_publish = None self.on_message = None + self.on_message_filtered = [] self.on_subscribe = None self.on_unsubscribe = None self.on_log = None @@ -1258,6 +1259,45 @@ class Client(object): self._thread.join() self._thread = None + def message_callback_add(self, sub, callback): + """Register a message callback for a specific topic. + Messages that match 'sub' will be passed to 'callback'. Any + non-matching messages will be passed to the default on_message + callback. + + Call multiple times with different 'sub' to define multiple topic + specific callbacks. + + Topic specific callbacks may be removed with + message_callback_remove().""" + if callback is None or sub is None: + raise ValueError("sub and callback must both be defined.") + + self._callback_mutex.acquire() + + for i in range(0, len(self.on_message_filtered)): + if self.on_message_filtered[i][0] == sub: + self.on_message_filtered[i] = (sub, callback) + self._callback_mutex.release() + return + + self.on_message_filtered.append((sub, callback)) + self._callback_mutex.release() + + def message_callback_remove(self, sub): + """Remove a message callback previously registered with + message_callback_add().""" + if sub is None: + raise ValueError("sub must defined.") + + self._callback_mutex.acquire() + for i in range(0, len(self.on_message_filtered)): + if self.on_message_filtered[i][0] == sub: + self.on_message_filtered.pop(i) + self._callback_mutex.release() + return + self._callback_mutex.release() + # ============================================================ # Private functions # ============================================================ @@ -2075,7 +2115,15 @@ class Client(object): def _handle_on_message(self, message): self._callback_mutex.acquire() - if self.on_message: + matched = False + for t in self.on_message_filtered: + if topic_matches_sub(t[0], message.topic): + self._in_callback = True + t[1](self, self._userdata, message) + self._in_callback = False + matched = True + + if matched == False and self.on_message: self._in_callback = True self.on_message(self, self._userdata, message) self._in_callback = False -- 2.39.5