message.timestamp = time.time()
if message.qos == 0:
- self._callback_mutex.acquire()
- if self.on_message:
- self._in_callback = True
- self.on_message(self, self._userdata, message)
- self._in_callback = False
-
- self._callback_mutex.release()
+ self._handle_on_message(message)
return MQTT_ERR_SUCCESS
elif message.qos == 1:
rc = self._send_puback(message.mid)
- self._callback_mutex.acquire()
- if self.on_message:
- self._in_callback = True
- self.on_message(self, self._userdata, message)
- self._in_callback = False
-
- self._callback_mutex.release()
+ self._handle_on_message(message)
return rc
elif message.qos == 2:
rc = self._send_pubrec(message.mid)
# Only pass the message on if we have removed it from the queue - this
# prevents multiple callbacks for the same message.
- self._callback_mutex.acquire()
- if self.on_message:
- self._in_callback = True
- self.on_message(self, self._userdata, self._in_messages[i])
- self._in_callback = False
- self._callback_mutex.release()
+ self._handle_on_message(message)
self._in_messages.pop(i)
self._inflight_messages = self._inflight_messages - 1
if self._max_inflight_messages > 0:
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
+ def _handle_on_message(self, message):
+ self._callback_mutex.acquire()
+ if self.on_message:
+ self._in_callback = True
+ self.on_message(self, self._userdata, message)
+ self._in_callback = False
+
+ self._callback_mutex.release()
+
def _thread_main(self):
run = True
self._thread_terminate = False