self._last_mid = 0
self._state = mqtt_cs_new
self._messages = []
+ self._max_inflight_messages = 20
+ self._inflight_messages = 0
self._will = False
self._will_topic = ""
self._will_payload = None
self._out_packet_mutex = threading.Lock()
self._current_out_packet_mutex = threading.Lock()
self._msgtime_mutex = threading.Lock()
+ self._message_mutex = threading.Lock()
self._thread = None
self._thread_terminate = False
self._ssl = None
message = MQTTMessage()
message.timestamp = time.time()
message.direction = mqtt_md_out
- if qos == 1:
- message.state = mqtt_ms_wait_puback
- elif qos == 2:
- message.state = mqtt_ms_wait_pubrec
message.mid = local_mid
message.topic = topic
message.retain = retain
message.dup = False
+ self._message_mutex.acquire()
self._messages.append(message)
- rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
- return (rc, local_mid)
+ if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
+ self._inflight_messages = self._inflight_messages+1
+ if qos == 1:
+ message.state = mqtt_ms_wait_puback
+ elif qos == 2:
+ message.state = mqtt_ms_wait_pubrec
+ self._message_mutex.release()
+
+ rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
+ return (rc, local_mid)
+ self._message_mutex.release()
+ return (MQTT_ERR_SUCCESS, local_mid)
def username_pw_set(self, username, password=None):
"""Set a username and optionally a password for broker authentication.
return MQTT_ERR_SUCCESS
+ def max_inflight_messages_set(self, inflight):
+ """Set the maximum number of messages with QoS>0 that can be part way
+ through their network flow at once. Defaults to 20."""
+ if inflight < 0:
+ raise ValueError('Invalid inflight.')
+ self._max_inflight_messages = inflight
+
def message_retry_set(self, retry):
"""Set the timeout in seconds before a message with QoS>0 is retried.
20 seconds by default."""
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
def _message_update(self, mid, direction, state):
+ self._message_mutex.acquire()
for m in self._messages:
if m.mid == mid and m.direction == direction:
m.state = state
m.timestamp = time.time()
+ self._message_mutex.release()
return MQTT_ERR_SUCCESS
+ self._message_mutex.release()
return MQTT_ERR_NOT_FOUND
def _message_retry_check(self):
+ self._message_mutex.acquire()
now = time.time()
for m in self._messages:
if m.timestamp + self._message_retry < now:
m.timestamp = now
m.dup = True
self._send_pubrel(m.mid, True)
+ self._message_mutex.release()
def _messages_reconnect_reset(self):
+ self._message_mutex.acquire()
for m in self._messages:
m.timestamp = 0
if m.direction == mqtt_md_out:
- if m.qos == 1:
- m.state = mqtt_ms_wait_puback
- elif m.qos == 2:
- m.state = mqtt_ms_wait_pubrec
+ if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
+ if m.qos == 1:
+ m.state = mqtt_ms_wait_puback
+ elif m.qos == 2:
+ m.state = mqtt_ms_wait_pubrec
+ else:
+ m.state = mqtt_ms_invalid
else:
self._messages.pop(self._messages.index(m))
+ self._message_mutex.release()
def _packet_queue(self, command, packet, mid, qos):
mpkt = MQTTPacket(command, packet, mid, qos)
elif message.qos == 2:
rc = self._send_pubrec(message.mid)
message.state = mqtt_ms_wait_pubrel
+ self._message_mutex.acquire()
self._messages.append(message)
+ self._message_mutex.release()
return rc
else:
return MQTT_ERR_PROTOCOL
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
+ self._message_mutex.acquire()
for i in range(len(self._messages)):
if self._messages[i].direction == mqtt_md_in and self._messages[i].mid == mid:
self._in_callback = False
self._callback_mutex.release()
self._messages.pop(i)
-
+ self._inflight_messages = self._inflight_messages - 1
+ if self._max_inflight_messages > 0:
+ rc = self._update_inflight()
+ if rc != MQTT_ERR_SUCCESS:
+ self._message_mutex.release()
+ return rc
+
+ self._message_mutex.release()
return self._send_pubcomp(mid)
+ self._message_mutex.release()
+ return MQTT_ERR_SUCCESS
+
+ def _update_inflight(self):
+ # Dont lock message_mutex here
+ for m in self._messages:
+ if self._inflight_messages < self._max_inflight_messages:
+ if m.qos > 0 and m.state == mqtt_ms_invalid and m.direction == mqtt_md_out:
+ self._inflight_messages = self._inflight_messages + 1
+ if m.qos == 1:
+ m.state = mqtt_ms_wait_puback
+ elif m.qos == 2:
+ m.state = mqtt_ms_wait_pubrec
+ rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ if rc != 0:
+ return rc
+ else:
+ return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def _handle_pubrec(self):
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
- for i in range(len(self._messages)):
- if self._messages[i].direction == mqtt_md_out and self._messages[i].mid == mid:
- self._messages[i].state = mqtt_ms_wait_pubcomp
- self._messages[i].timestamp = time.time()
+ self._message_mutex.acquire()
+ for m in self._messages:
+ if m.direction == mqtt_md_out and m.mid == mid:
+ m.state = mqtt_ms_wait_pubcomp
+ m.timestamp = time.time()
+ self._message_mutex.release()
return self._send_pubrel(mid, False)
+ self._message_mutex.release()
return MQTT_ERR_SUCCESS
def _handle_unsuback(self):
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
+ self._message_mutex.acquire()
for i in range(len(self._messages)):
try:
if self._messages[i].direction == mqtt_md_out and self._messages[i].mid == mid:
self._callback_mutex.release()
self._messages.pop(i)
+ self._inflight_messages = self._inflight_messages - 1
+ if self._max_inflight_messages > 0:
+ rc = self._update_inflight()
+ if rc != MQTT_ERR_SUCCESS:
+ self._message_mutex.release()
+ return rc
+ self._message_mutex.release()
+ return MQTT_ERR_SUCCESS
except IndexError:
# Have removed item so i>count.
# Not really an error.
pass
+ self._message_mutex.release()
return MQTT_ERR_SUCCESS
def _thread_main(self):