# Message state
mqtt_ms_invalid = 0,
-mqtt_ms_wait_puback = 1
-mqtt_ms_wait_pubrec = 2
-mqtt_ms_wait_pubrel = 3
-mqtt_ms_wait_pubcomp = 4
+mqtt_ms_publish_qos0 = 1
+mqtt_ms_publish_qos1 = 2
+mqtt_ms_wait_for_puback = 3
+mqtt_ms_publish_qos2 = 4
+mqtt_ms_wait_for_pubrec = 5
+mqtt_ms_resend_pubrel = 6
+mqtt_ms_wait_for_pubrel = 7
+mqtt_ms_resend_pubcomp = 8
+mqtt_ms_wait_for_pubcomp = 9
+mqtt_ms_send_pubrec = 10
+mqtt_ms_queued = 11
# Error values
MQTT_ERR_AGAIN = -1
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
self._inflight_messages = self._inflight_messages+1
if qos == 1:
- message.state = mqtt_ms_wait_puback
+ message.state = mqtt_ms_wait_for_puback
elif qos == 2:
- message.state = mqtt_ms_wait_pubrec
+ message.state = mqtt_ms_wait_for_pubrec
self._out_message_mutex.release()
rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
return (rc, local_mid)
- self._out_message_mutex.release()
- return (MQTT_ERR_SUCCESS, local_mid)
+ else:
+ message.state = mqtt_ms_queued;
+ self._out_message_mutex.release()
+ return (MQTT_ERR_SUCCESS, local_mid)
def username_pw_set(self, username, password=None):
"""Set a username and optionally a password for broker authentication.
now = time.time()
for m in messages:
if m.timestamp + self._message_retry < now:
- if m.state == mqtt_ms_wait_puback or m.state == mqtt_ms_wait_pubrec:
+ if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
m.timestamp = now
m.dup = True
self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
- elif m.state == mqtt_ms_wait_pubrel:
+ elif m.state == mqtt_ms_wait_for_pubrel:
m.timestamp = now
m.dup = True
self._send_pubrec(m.mid)
- elif m.state == mqtt_ms_wait_pubcomp:
+ elif m.state == mqtt_ms_wait_for_pubcomp:
m.timestamp = now
m.dup = True
self._send_pubrel(m.mid, True)
def _messages_reconnect_reset_out(self):
self._out_message_mutex.acquire()
+ self._inflight_messages = 0
for m in self._out_messages:
m.timestamp = 0
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
- if m.qos == 1:
- m.state = mqtt_ms_wait_puback
+ if m.qos == 0:
+ m.state = mqtt_ms_publish_qos0
+ elif m.qos == 1:
+ #self._inflight_messages = self._inflight_messages + 1
+ if m.state == mqtt_ms_wait_for_puback:
+ m.dup = True
+ m.state = mqtt_ms_publish_qos1
elif m.qos == 2:
- # Preserve current state
- pass
+ #self._inflight_messages = self._inflight_messages + 1
+ if m.state == mqtt_ms_wait_for_pubcomp:
+ m.state = mqtt_ms_resend_pubrel
+ m.dup = True
+ else:
+ if m.state == mqtt_ms_wait_for_pubrec:
+ m.dup = True
+ m.state = mqtt_ms_publish_qos2
else:
- m.state = mqtt_ms_invalid
+ m.state = mqtt_ms_queued
self._out_message_mutex.release()
def _messages_reconnect_reset_in(self):
self._in_callback = False
self._callback_mutex.release()
if result == 0:
- return MQTT_ERR_SUCCESS
+ rc = 0
+ self._out_message_mutex.acquire()
+ for m in self._out_messages:
+ m.timestamp = time.time()
+ if m.state == mqtt_ms_queued:
+ self._out_message_mutex.release()
+ return MQTT_ERR_SUCCESS
+
+ if m.qos == 0:
+ rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ if rc != 0:
+ self._out_message_mutex.release()
+ return rc
+ elif m.qos == 1:
+ if m.state == mqtt_ms_publish_qos1:
+ self._inflight_messages = self._inflight_messages + 1
+ m.state = mqtt_ms_wait_for_puback
+ rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ if rc != 0:
+ self._out_message_mutex.release()
+ return rc
+ elif m.qos == 2:
+ if m.state == mqtt_ms_publish_qos2:
+ self._inflight_messages = self._inflight_messages + 1
+ m.state = mqtt_ms_wait_for_pubrec
+ rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ if rc != 0:
+ self._out_message_mutex.release()
+ return rc
+ elif m.state == mqtt_ms_resend_pubrel:
+ self._inflight_messages = self._inflight_messages + 1
+ m.state = mqtt_ms_wait_for_pubcomp
+ rc = self._send_pubrel(m.mid, m.dup)
+ if rc != 0:
+ self._out_message_mutex.release()
+ return rc
+ self._out_message_mutex.release()
+ return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return rc
elif message.qos == 2:
rc = self._send_pubrec(message.mid)
- message.state = mqtt_ms_wait_pubrel
+ message.state = mqtt_ms_wait_for_pubrel
self._in_message_mutex.acquire()
self._in_messages.append(message)
self._in_message_mutex.release()
# Dont lock message_mutex here
for m in self._out_messages:
if self._inflight_messages < self._max_inflight_messages:
- if m.qos > 0 and m.state == mqtt_ms_invalid:
+ if m.qos > 0 and m.state == mqtt_ms_queued:
self._inflight_messages = self._inflight_messages + 1
if m.qos == 1:
m.state = mqtt_ms_wait_puback
self._out_message_mutex.acquire()
for m in self._out_messages:
if m.mid == mid:
- m.state = mqtt_ms_wait_pubcomp
+ m.state = mqtt_ms_wait_for_pubcomp
m.timestamp = time.time()
self._out_message_mutex.release()
return self._send_pubrel(mid, False)