self.retain = False
-class MQTTInPacket:
- """Internal datatype."""
- def __init__(self):
- self.command = 0
- self.have_remaining = 0
- self.remaining_count = []
- self.remaining_mult = 1
- self.remaining_length = 0
- self.packet = b""
- self.to_process = 0
- self.pos = 0
-
- def cleanup(self):
- self.__init__()
-
-
class MQTTPacket:
"""Internal datatype."""
def __init__(self, command, packet, mid, qos):
self._username = ""
self._password = ""
- self._in_packet = MQTTInPacket()
+ self._in_packet = {
+ "command": 0,
+ "have_remaining": 0,
+ "remaining_count": [],
+ "remaining_mult": 1,
+ "remaining_length": 0,
+ "packet": b"",
+ "to_process": 0,
+ "pos": 0}
self._out_packet = []
self._current_out_packet = None
self._last_msg_in = time.time()
if self._port <= 0:
raise ValueError('Invalid port number.')
- self._in_packet.cleanup()
+ self._in_packet = {
+ "command": 0,
+ "have_remaining": 0,
+ "remaining_count": [],
+ "remaining_mult": 1,
+ "remaining_length": 0,
+ "packet": b"",
+ "to_process": 0,
+ "pos": 0}
+
self._out_packet_mutex.acquire()
self._out_packet = []
self._out_packet_mutex.release()
# fail due to longer length, so save current data and current position.
# After all data is read, send to _mqtt_handle_packet() to deal with.
# Finally, free the memory and reset everything to starting conditions.
- if self._in_packet.command == 0:
+ if self._in_packet['command'] == 0:
try:
if self._ssl:
command = self._ssl.read(1)
if len(command) == 0:
return 1
command = struct.unpack("!B", command)
- self._in_packet.command = command[0]
+ self._in_packet['command'] = command[0]
- if self._in_packet.have_remaining == 0:
+ if self._in_packet['have_remaining'] == 0:
# Read remaining
# Algorithm for decoding taken from pseudo code at
# http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
else:
byte = struct.unpack("!B", byte)
byte = byte[0]
- self._in_packet.remaining_count.append(byte)
+ self._in_packet['remaining_count'].append(byte)
# Max 4 bytes length for remaining length as defined by protocol.
# Anything more likely means a broken/malicious client.
- if len(self._in_packet.remaining_count) > 4:
+ if len(self._in_packet['remaining_count']) > 4:
return MQTT_ERR_PROTOCOL
- self._in_packet.remaining_length = self._in_packet.remaining_length + (byte & 127)*self._in_packet.remaining_mult
- self._in_packet.remaining_mult = self._in_packet.remaining_mult * 128
+ self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult']
+ self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
if (byte & 128) == 0:
break
- self._in_packet.have_remaining = 1
- self._in_packet.to_process = self._in_packet.remaining_length
+ self._in_packet['have_remaining'] = 1
+ self._in_packet['to_process'] = self._in_packet['remaining_length']
- while self._in_packet.to_process > 0:
+ while self._in_packet['to_process'] > 0:
try:
if self._ssl:
- data = self._ssl.read(self._in_packet.to_process)
+ data = self._ssl.read(self._in_packet['to_process'])
else:
- data = self._sock.recv(self._in_packet.to_process)
+ data = self._sock.recv(self._in_packet['to_process'])
except socket.error as err:
(msg) = err
if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
print(msg)
return 1
else:
- self._in_packet.to_process = self._in_packet.to_process - len(data)
- self._in_packet.packet = self._in_packet.packet + data
+ self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
+ self._in_packet['packet'] = self._in_packet['packet'] + data
# All data for this packet is read.
- self._in_packet.pos = 0
+ self._in_packet['pos'] = 0
rc = self._packet_handle()
# Free data and reset values
- self._in_packet.cleanup()
+ self._in_packet = dict(
+ command=0,
+ have_remaining=0,
+ remaining_count=[],
+ remaining_mult=1,
+ remaining_length=0,
+ packet=b"",
+ to_process=0,
+ pos=0)
self._msgtime_mutex.acquire()
self._last_msg_in = time.time()
return MQTT_ERR_SUCCESS
def _packet_handle(self):
- cmd = self._in_packet.command&0xF0
+ cmd = self._in_packet['command']&0xF0
if cmd == PINGREQ:
return self._handle_pingreq()
elif cmd == PINGRESP:
def _handle_pingreq(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 0:
+ if self._in_packet['remaining_length'] != 0:
return MQTT_ERR_PROTOCOL
self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ")
def _handle_pingresp(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 0:
+ if self._in_packet['remaining_length'] != 0:
return MQTT_ERR_PROTOCOL
# No longer waiting for a PINGRESP.
def _handle_connack(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 2:
+ if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
- if len(self._in_packet.packet) != 2:
+ if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
- (resvd, result) = struct.unpack("!BB", self._in_packet.packet)
+ (resvd, result) = struct.unpack("!BB", self._in_packet['packet'])
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(resvd)+", "+str(result)+")")
self._callback_mutex.acquire()
if self.on_connect:
def _handle_suback(self):
self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
- pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
- (mid, packet) = struct.unpack(pack_format, self._in_packet.packet)
+ pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
+ (mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = "!" + "B"*len(packet)
granted_qos = struct.unpack(pack_format, packet)
def _handle_publish(self):
rc = 0
- header = self._in_packet.command
+ header = self._in_packet['command']
message = MQTTMessage()
message.direction = mqtt_md_in
message.dup = (header & 0x08)>>3
message.qos = (header & 0x06)>>1
message.retain = (header & 0x01)
- pack_format = "!H" + str(len(self._in_packet.packet)-2) + 's'
- (slen, packet) = struct.unpack(pack_format, self._in_packet.packet)
+ pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
+ (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
(message.topic, packet) = struct.unpack(pack_format, packet)
def _handle_pubrel(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 2:
+ if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
- if len(self._in_packet.packet) != 2:
+ if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
- mid = struct.unpack("!H", self._in_packet.packet)
+ mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
def _handle_pubrec(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 2:
+ if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
- mid = struct.unpack("!H", self._in_packet.packet)
+ mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
def _handle_unsuback(self):
if self._strict_protocol:
- if self._in_packet.remaining_length != 2:
+ if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
- mid = struct.unpack("!H", self._in_packet.packet)
+ mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
self._callback_mutex.acquire()
def _handle_pubackcomp(self, cmd):
if self._strict_protocol:
- if self._in_packet.remaining_length != 2:
+ if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
- mid = struct.unpack("!H", self._in_packet.packet)
+ mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")