self.on_log = None
self._host = ""
self._port = 1883
+ self._bind_address = ""
self._in_callback = False
self._strict_protocol = False
self._callback_mutex = threading.Lock()
self._tls_version = tls_version
self._tls_ciphers = ciphers
- def connect(self, host, port=1883, keepalive=60):
+ def connect(self, host, port=1883, keepalive=60, bind_address=""):
"""Connect to a remote broker.
host is the hostname or IP address of the remote broker.
broker. If no other messages are being exchanged, this controls the
rate at which the client will send ping messages to the broker.
"""
- self.connect_async(host, port, keepalive)
+ self.connect_async(host, port, keepalive, bind_address)
return self.reconnect()
- def connect_async(self, host, port=1883, keepalive=60):
+ def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
"""Connect to a remote broker asynchronously. This is a non-blocking
connect call that can be used with loop_start() to provide very quick
start.
self._host = host
self._port = port
self._keepalive = keepalive
+ self._bind_address = bind_address
self._state_mutex.acquire()
self._state = mqtt_cs_connect_async
# Put messages in progress in a valid state.
self._messages_reconnect_reset()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # FIXME use create_connection here
-
+ try:
+ self._sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
+ except socket.error as err:
+ (msg) = err
+ if msg.errno != errno.EINPROGRESS:
+ raise
if self._tls_ca_certs != None:
self._ssl = ssl.wrap_socket(self._sock,