PubSub Module

The pubsub module provides interface for the mosquitto client.

It provides classes to create mqtt clients vai paho-mqtt library to connect to mosquitto broker server, interact with and publish/subscribe to mosquitto via creating topics, methods to publish, subscribe/consume, stop consuming, start publishing, start connection, stop connection, acknowledge delivery by publisher, acknowledge receiving of messages by consumers and also add callbacks for various other events.

MosquittoClient

class mosquittoChat.apps.mosquitto.pubsub.MosquittoClient(participants=1, name='user', clientid=None, clean_session=True, userdata=None, host='localhost', port=1883, keepalive=60, bind_address='', username='guest', password='guest')

This is a Mosquitto Client class that will create an interface to connect to mosquitto by creating mqtt clients.

It provides methods for connecting, diconnecting, publishing, subscribing, unsubscribing and also callbacks related to many different events like on_connect, on_message, on_publish, on_subscribe, on_unsubcribe, on_disconnect.

__init__(participants=1, name='user', clientid=None, clean_session=True, userdata=None, host='localhost', port=1883, keepalive=60, bind_address='', username='guest', password='guest')

Create a new instance of the MosquittoClient class, passing in the client informaation, host, port, keepalive parameters.

Parameters:
  • participants (int) – number of participants available presently
  • name (string) – name of client trying to connect to msoquitto
  • clientid (string) – unique client id for a client-broker connection
  • clean_session (bool) – whether to keep persistant connecion or not
  • userdata (user defined data (can be int, string, or any object)) – user defined data of any type that is passed as the userdata parameter to callbacks. It may be updated at a later point with the user_data_set() function.
  • host (string) – the hostname or IP address of the remote broker
  • port (int) – the network port of the server host to connect to. Defaults to 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you are using tls_set() the port may need providing manually
  • keepalive (int) – maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker
  • bind_address (string) – the IP address of a local network interface to bind this client to, assuming multiple interfaces exist
  • username (string) – username for authentication
  • password (string) – password for authentication
_genid()

Method that generates unique clientids by calling base64.urlsafe_b64encode(os.urandom(32)).replace(‘=’, ‘e’).

Returns:Returns a unique urlsafe id
Return type:string
start()

Method to start the mosquitto client by initiating a connection to mosquitto broker by using the connect method and staring the network loop.

setup_connection()

Method to setup the extra options like username,password, will set, tls_set etc before starting the connection.

create_client()

Method to create the paho-mqtt Client object which will be used to connect to mosquitto.

Returns:Returns a mosquitto mqtt client object
Return type:paho.mqtt.client.Client
setup_callbacks()

Method to setup all callbacks related to the connection, like on_connect, on_disconnect, on_publish, on_subscribe, on_unsubcribe etc.

connect()

This method connects to Mosquitto via returning the connection return code.

When the connection is established, the on_connect callback will be invoked by paho-mqtt.

Returns:Returns a mosquitto mqtt connection return code, success, failure, error, etc
Return type:int
on_connect(client, userdata, flags, rc)

This is a Callback method and is called when the broker responds to our connection request.

Parameters:
  • client – the client instance for this callback
  • userdata – the private user data as set in Client() or userdata_set()
  • flags (dict) – response flags sent by the broker
  • rc (int) – the connection result

flags is a dict that contains response flags from the broker:

flags[‘session present’] - this flag is useful for clients that are using clean session set to 0 only. If a client with clean session=0, that reconnects to a broker that it has previously connected to, this flag indicates whether the broker still has the session information for the client. If 1, the session still exists.

The value of rc indicates success or not:

0: Connection successful 1: Connection refused - incorrect protocol version 2: Connection refused - invalid client identifier 3: Connection refused - server unavailable 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused.

start_ioloop()

Method to start ioloop for paho-mqtt mosquitto clients so that it can process read/write events for the sockets.

Using tornado’s ioloop, since if we use any of the loop*() function provided by phao-mqtt library, it will either block the entire tornado thread, or it will keep on creating separate thread for each client if we use loop_start() fucntion.

We don’t want to block thread or to create so many threads unnecessarily given python GIL.

Since the separate threads calls the loop() function indefinitely, and since its doing network io, its possible it may release GIL, but I haven’t checked that yet, if that is the case, we can very well use loop_start().Pattern

But for now we will add handlers to tornado’s ioloop().

stop_ioloop()

Method to stop ioloop for paho-mqtt mosquitto clients so that it cannot process any more read/write events for the sockets.

Actually the paho-mqtt mosquitto socket has been closed, so bascially this method removed the tornaod ioloop handler for this socket.

_events_handler(fd, events)

Handle IO/Event loop events, processing them.

Parameters:
  • fd (int) – The file descriptor for the events
  • events (int) – Events from the IO/Event loop
start_schedular()

This method calls Tornado’s PeriodicCallback to schedule a callback every few seconds, which calls paho mqtt client’s loop_misc() function which keeps the connection open by checking for keepalive value and by keep sending pingreq and pingresp to moqsuitto broker.

stop_schedular()

This method calls stops the tornado’s periodicCallback Schedular loop.

disconnect()

Method to disconnect the mqqt connection with mosquitto broker.

on_disconnect callback is called as a result of this method call.

on_disconnect(client, userdata, rc)

This is a Callback method and is called when the client disconnects from the broker.

subscribe(topic_list=None)

This method sets up the mqtt client to start subscribing to topics by accepting a list of tuples of topic and qos pairs.

The on_subscribe method is called as a callback if subscribing is succesfull or if it unsuccessfull, the broker returng the suback frame.

:param :topic_list: a tuple of (topic, qos), or, a list of tuple of format (topic, qos). :type :topic_list: list or tuple

on_subscribe(client, userdata, mid, granted_qos)

This is a Callback method and is called when the broker responds to a subscribe request.

The mid variable matches the mid variable returned from the corresponding subscribe() call. The granted_qos variable is a list of integers that give the QoS level the broker has granted for each of the different subscription requests.

Parameters:
  • client – the client which subscribed which triggered this callback
  • userdata – the userdata associated with the client during its creation
  • mid (int) – the message id value returned by the broker
  • granted_qos (list) – list of integers that give the QoS level the broker has granted for each of the different subscription requests
addNewMqttMosquittoClient()

Method called after new mqtt connection is established and the client has started subsribing to atleast some topics, called by on_subscribe callback.

sendMsgToWebsocket(msg)

Method to send message to associated websocket.

Parameters:msg (string, unicode or json encoded string or a dict) – the message to be sent to the websocket
unsubscribe(topic_list=None)

This method sets up the mqtt client to unsubscribe to topics by accepting topics as string or list.

The on_unsubscribe method is called as a callback if unsubscribing is succesfull or if it unsuccessfull.

Parameters:topic_list (list of strings(topics)) – The topics to be unsubscribed from
on_unsubscribe(client, userdata, mid)

This is a Callback method and is called when the broker responds to an unsubscribe request. The mid variable matches the mid variable returned from t he corresponding unsubscribe() call.

Parameters:
  • client – the client which initiated unsubscribed
  • userdata – the userdata associated with the client
  • mid (int) – the message id value sent by the broker of the unsubscribe call.
publish(topic, msg=None, qos=2, retain=False)

If the class is not stopping, publish a message to MosquittoClient.

on_publish callback is called after broker confirms the published message.

Parameters:
  • topic (string) – The topic the message is to published to
  • msg (string) – Message to be published to broker
  • qos (int (0, 1 or 2)) – the qos of publishing message
  • retain (bool) – Should the message be retained or not
on_publish(client, userdata, mid)

This is a Callback method and is called when a message that was to be sent using the publish() call has completed transmission to the broker. For messages with QoS levels 1 and 2, this means that the appropriate handshakes have completed.

For QoS 0, this simply means that the message has left the client. The mid variable matches the mid variable returned from the corresponding publish() call, to allow outgoing messages to be tracked.

This callback is important because even if the publish() call returns success, it does not always mean that the message has been sent.

Parameters:
  • client – the client who initiated the publish method
  • userdata – the userdata associated with the client during its creation
  • mid (int) – the message id sent by the broker
on_private_message(client, userdata, msg)

This is a Callback method and is called when a message has been received on a topic [private/cientid/msgs] that the client subscribes to.

Parameters:
  • client – the client who initiated the publish method
  • userdata – the userdata associated with the client during its creation
  • msg – the message sent by the broker
on_private_status(client, userdata, msg)

This is a Callback method and is called when a message has been received on a topic [private/cientid/status] that the client subscribes to.

Parameters:
  • client – the client who initiated the publish method
  • userdata – the userdata associated with the client during its creation
  • msg – the message sent by the broker
on_public_message(client, userdata, msg)

This is a Callback method and is called when a message has been received on a topic [public/msgs] that the client subscribes to.

Parameters:
  • client – the client who initiated the publish method
  • userdata – the userdata associated with the client during its creation
  • msg – the message sent by the broker
send_offline_status()

Method is called when the mqtt client’s corresponding websocket is closed. This method will send the subcribing clients to its private status an offline status message.

delMqttMosquittoClient()

Method called after an mqtt clinet unsubsribes to atleast some topics, called by on_subscribe callback.

Returns:Returns update mqqt clients active
Return type:dict with update count
stop()

Cleanly shutdown the connection to Mosquitto by disconnecting the mqtt client.

When mosquitto confirms disconection, on_disconnect callback will be called.

__dict__ = dict_proxy({'__module__': 'mosquittoChat.apps.mosquitto.pubsub', 'create_client': <function create_client>, 'setup_connection': <function setup_connection>, 'on_publish': <function on_publish>, 'stop': <function stop>, 'stop_ioloop': <function stop_ioloop>, 'on_subscribe': <function on_subscribe>, 'subscribe': <function subscribe>, 'unsubscribe': <function unsubscribe>, 'connect': <function connect>, '__dict__': <attribute '__dict__' of 'MosquittoClient' objects>, '__weakref__': <attribute '__weakref__' of 'MosquittoClient' objects>, '__init__': <function __init__>, 'addNewMqttMosquittoClient': <function addNewMqttMosquittoClient>, 'sendMsgToWebsocket': <function sendMsgToWebsocket>, 'on_disconnect': <function on_disconnect>, 'send_offline_status': <function send_offline_status>, 'disconnect': <function disconnect>, 'setup_callbacks': <function setup_callbacks>, 'stop_schedular': <function stop_schedular>, 'start': <function start>, 'on_public_message': <function on_public_message>, 'on_private_message': <function on_private_message>, 'on_connect': <function on_connect>, '__doc__': '\n This is a Mosquitto Client class that will create an interface to connect to mosquitto\n by creating mqtt clients.\n\n It provides methods for connecting, diconnecting, publishing, subscribing, unsubscribing and \n also callbacks related to many different events like on_connect, on_message, on_publish, on_subscribe,\n on_unsubcribe, on_disconnect.\n\n ', '_genid': <function _genid>, 'start_ioloop': <function start_ioloop>, 'delMqttMosquittoClient': <function delMqttMosquittoClient>, 'on_unsubscribe': <function on_unsubscribe>, 'start_schedular': <function start_schedular>, '_events_handler': <function _events_handler>, 'on_private_status': <function on_private_status>, 'publish': <function publish>})
__module__ = 'mosquittoChat.apps.mosquitto.pubsub'
__weakref__

list of weak references to the object (if defined)