From 63d9f843668f1fc5e34b6f2fdb6b87b434b508a5 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Wed, 30 Apr 2014 00:43:06 +0100 Subject: [PATCH] Simplify on_message handling code. Change-Id: Id8833f29c2054baece19ad42be6c9dd4cfcacb4c --- src/paho/mqtt/client.py | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 6b01eba..12c646a 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -1931,23 +1931,11 @@ class Client(object): message.timestamp = time.time() if message.qos == 0: - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, message) - self._in_callback = False - - self._callback_mutex.release() + self._handle_on_message(message) return MQTT_ERR_SUCCESS elif message.qos == 1: rc = self._send_puback(message.mid) - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, message) - self._in_callback = False - - self._callback_mutex.release() + self._handle_on_message(message) return rc elif message.qos == 2: rc = self._send_pubrec(message.mid) @@ -1977,12 +1965,7 @@ class Client(object): # Only pass the message on if we have removed it from the queue - this # prevents multiple callbacks for the same message. - self._callback_mutex.acquire() - if self.on_message: - self._in_callback = True - self.on_message(self, self._userdata, self._in_messages[i]) - self._in_callback = False - self._callback_mutex.release() + self._handle_on_message(message) self._in_messages.pop(i) self._inflight_messages = self._inflight_messages - 1 if self._max_inflight_messages > 0: @@ -2090,6 +2073,15 @@ class Client(object): self._out_message_mutex.release() return MQTT_ERR_SUCCESS + def _handle_on_message(self, message): + self._callback_mutex.acquire() + if self.on_message: + self._in_callback = True + self.on_message(self, self._userdata, message) + self._in_callback = False + + self._callback_mutex.release() + def _thread_main(self): run = True self._thread_terminate = False -- 2.39.5