if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
- max_packets = len(self._out_messages) + len(self._in_messages)
+ max_packets = len(self._out_packet) + 1
if max_packets < 1:
max_packets = 1
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
+ self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
if m.qos == 0:
+ self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
+ self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
+ self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
+ self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
+ self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup)
+ self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
+ self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return rc
elif result > 0 and result < 6: