MQTT

  1. General Information
    1. What is MQTT?
    2. Communication Architecture
  2. Simple API Setup and Overview
  3. Simple Setup of XDK as MQTT Client
    1. Initialization
    2. Configuring the Session
    3. Subscribing to a Topic
    4. Publishing on a Topic
    5. Handling Messages from Subscribed Topic
  4. Simple Full Code Example
  5. Advanced API Setup and Overview
  6. Advanced Setup of XDK as MQTT Client
    1. Initialization
    2. Event Handler
    3. Configuring the Session
    4. Connecting To MQTT Broker
    5. Subscribing to a Topic
    6. Publishing on a Topic
    7. Recommended Application Flow
  7. Advanced Full Code Example

General Information

What is MQTT?

Initially developed by IBM to create a protocol for minimal battery loss and bandwidth, connecting oil pipelines via satellite connection, it is now an open standard. Gaining great acceptance in the IoT environment MQTT is persevering its initial goals like quality of service data delivery, continuous session awareness, simple implementation with a lightweight protocol stack especially useful on embedded devices with limited capacities.

As the following picture shows, the MQTT protocol is based on TCP/IP, and all clients and the broker need to have a TCP/IP Stack.

Image

Although MQTT is widely associated with the term Message Queuing Telemetry Transport it is not the correct meaning of the abbreviation: The name MQTT comes from an IBM product called MQseries and has nothing to do with traditional message queues. In general MQTT can be seen as a publish/subscribe messaging protocol based on TCP/IP using the client/server model.

Communication Architecture

The communication architecture is based on the client-server model where clients and servers are responsible for specific tasks. In general the roles are defined as the following:

  • Server: awaits incoming connection requests by clients and provides resources or services
  • Client: initiates the communication and requests the server’s content and functionalities

In MQTT there are two kinds of client types:

  • Subscriber client: is interested in a special set of data from other publishers and registers as a potential recipient of data whenever it is published to the server
  • Publisher client: provides data that can be interesting to subscribers by pushing data to the server whenever the data is changed or needs to be updated

The clients, either subscriber, publisher or both, do not communicate directly with and do not know about each other but rather exchange data with the help of a common central server in the communication architecture, called the broker. This principle is described as the clients being decoupled from each other. Decoupling happens on three stages:

  • Space decoupling: publisher and subscriber do not need to know each other (e.g. IP/port)
  • Time decoupling: publisher and subscriber do not need to run at the same time
  • Synchronization decoupling: operations on both sides are not halted during publishing / receiving

A MQTT client can be any device from a micro controller up to a full-blown server with a MQTT library running and is connecting to an MQTT broker over any kind of network. MQTT libraries are available for a huge variety of programming languages (e.g. C, C++, C#, Go, iOS, Java, JavaScript, .NET).

Each client that wants to talk to other clients needs to be connected to the same broker. The broker needs to receive and filter all incoming messages, check which clients apply as interested, connected and available recipients and distribute the updated message according to the subscriptions. It is also an important task of the broker to provide the authentication and authorization of clients since the broker is exposed and accessible by many communication partners (e.g. via the internet). Whereas the MQTT client can be deployed on a great variety of devices with flexible performance and capabilities it is essential that the MQTT server can handle various client connections and data transmission between the publishers and subscribers.

As a premise for the communication architecture both clients, publisher and subscriber, initiate a connect message to the same broker which responds with an acknowledgement including a status code. Once the connection is established, the broker will keep it active as long as the client does not perform a disconnect or the connection is lost.

Simple API Setup and Overview

This guide is divided in two general parts that present a basic implementation using the XDK MQTT APIs. This part is useful for simple MQTT applications to access the main MQTT functions and provide a quick start into introducing MQTT to a XDK project. Additionally, a few other APIs are used in this guide which can be taken over from the Simple Full Code Example. The following code shows how to include the header file used in this section.

// Simple MQTT API
#include "XDK_MQTT.h"

Finally, as MQTT is a communication protocol, a connection between the XDK and the broker has to be established. In this example, broker.hivemq.com is only accessible via the internet. This is why the XDK should connect via Wi-Fi to an access point with an internet connection. For more information on how to establish a network connection, please refer to the Wi-Fi section of the Knowledgebase.

The following table lists some of the important structured data types of the simple MQTT API.

Data Type

Description

MqttSetupInfo

This data type is used to setup the MQTT module with the correct stack type and security configuration.

MqttConnectInfo

This data type is used to setup the MQTT connection. These include client identifier, broker URL and port as well as parameters like cleanSession or keepAliveInterval.

MqttSubscribeInfo

This data type configures the MQTT subscription details and includes the topic address, the QoS level and a callback function for incoming messages on this topic.

MqttPublishInfo

This data type configures the MQTT publish details and includes the topic address, the QoS level and payload information.

All these data types are needed to store your MQTT settings in regards of general setup and more application specific publish and subscribe configuration.

The following table lists some of the important functions of the simple MQTT API. The functions use the data types defined in the data types table above.

Function

Description

MQTT_Setup()

This function is used to setup the MQTT module as specified in the according data type.

MQTT_ConnectToBroker()

This function is used to connect to a MQTT broker as specified in the data type with a timeout to wait for a successful connection.

MQTT_SubsribeToTopic()

This function is used to subscribe to a MQTT topic as specified in the data type with a timeout to wait for a successful connection.

MQTT_PublishToTopic()

This function is used to subscribe to a MQTT topic as specified in the data type with a timeout to wait for a successful connection.

Simple Setup of XDK as MQTT Client

Assuming that the basic setup for using the API is done, and the XDK is connected to a Wi-Fi network, this chapter will focus on implementing a simple MQTT Client on the XDK. Be aware that in contrary to the advanced MQTT part this simple part is not considering any detailed event handling of connection states.

After basic steps for initializing the XDK as a standard-conform MQTT client the communication architecture will be built up between the XDK, and an external MQTT broker. Apart from connecting to an MQTT broker, this chapter will show how to perform basic data exchange in both ways of sending information to and receiving information from external communication partners. Learning how the communication works in the introduction chapter 1 and applying it in this chapter will enable the introduction of simple MQTT features into real-life applications developed with the XDK.

Initialization

Before the simple MQTT API can be used, the MQTT module must first be initialized. The initialization of the MQTT client is accomplished with the data type and function for MQTT setup as shown in chapter Simple API Setup and Overview. For that we simply set the data type and call the function in the XDK setup function of our application template. In the data type we define to use the Serval-Stack as our MQTT type and prefer a non-secure connection for this example.

static MQTT_Setup_T MqttSetupInfo =
{
                        .MqttType = MQTT_TYPE_SERVALSTACK,
                        .IsSecure = false,
};/**< MQTT setup parameters */

static void AppControllerSetup(void * param1, uint32_t param2)
{
    ...
    Retcode_T retcode = MQTT_Setup(&MqttSetupInfo);;
    if (RETCODE_OK != retcode)
    {
        printf("AppControllerSetup : Failed \r\n");
        Retcode_RaiseError(retcode);
    }
}

Configuring the Session

After setting up the XDK client this chapter shows how to connect to a defined MQTT broker. To accomplish this the according MQTT connection data type and function is needed. In the data type we define the client-ID “XDK_FS”, the broker URL “broker.hivemq.com”, the port 1883, clean session and a keep-alive interval of 100ms.

static MQTT_Connect_T MqttConnectInfo =
{
                .ClientId = "XDK_FS",
                .BrokerURL = "broker.hivemq.com",
                .BrokerPort = 1883,
                .CleanSession = true,
                .KeepAliveInterval = 100,
};/**< MQTT connect parameters */

static void AppControllerFire(void* pvParameters)
{
    ...
    Retcode_T retcode = MQTT_ConnectToBroker(&MqttConnectInfo, UINT32_C(60000));
    }

    if (RETCODE_OK != retcode)
    {
        printf("MQTT Connection : Failed \r\n");
        Retcode_RaiseError(retcode);
    }
    ...
}

Subscribing to a Topic

Having the connection to a broker established the XDK can subscribe to a given topic with the according MQTT subscription data type and function. In the data type we define the topic “mqtt/xdk/topic”, the quality of service level 1 and the callback function to handle incoming messages.

static MQTT_Subscribe_T MqttSubscribeInfo =
{
                .Topic = "mqtt/xdk/topic",
                .QoS = 1UL,
                .IncomingPublishNotificationCB = AppMQTTSubscribeCB,
};/**< MQTT subscribe parameters */

static void AppControllerFire(void* pvParameters)
{
    ...
    Retcode_T retcode = MQTT_SubsribeToTopic(&MqttSubscribeInfo, UINT32_C(5000));
    if (RETCODE_OK != retcode)
    {
        printf("MQTT Subscription : Failed \r\n");
        Retcode_RaiseError(retcode);
    }
    ...
}

Publishing on a Topic

Having the connection to a broker established the XDK can publish on a given topic with the according MQTT publish data type and function. For this example we use a simple “Hello World” and publish to the topic we subscribed to in the previous chapter. In the data type we define, matching to the subscription chapter, the topic “mqtt/xdk/topic”, the quality of service level 1 and the variable payload parameters.

static MQTT_Publish_T MqttPublishInfo =
{
                .Topic = "mqtt/xdk/topic",
                .QoS = 1UL,
                .Payload = NULL,
                .PayloadLength = 0UL,
};/**< MQTT publish parameters */

static void AppControllerFire(void* pvParameters)
{
    ...
    const char * publishBuffer = "Hello World";
    MqttPublishInfo.Payload = publishBuffer;
    MqttPublishInfo.PayloadLength = strlen(publishBuffer);

    Retcode_T retcode = MQTT_PublishToTopic(&MqttPublishInfo, UINT32_C(5000));
    vTaskDelay(UINT32_C(5000));0
    if (RETCODE_OK != retcode)
    {
        printf("MQTT Publish : Failed \r\n");
    }
    ...
}

Handling Messages from Subscribed Topic

After subscribing to a topic the incoming message is handled by the callback function defined in the according subscription data type. In the data type of the callback function parameter you can extract the topic and the payload data, each in pairs of payload pointers and payload length. In our callback function example we extract all the topic and payload information and print it to console output.

static void AppMQTTSubscribeCB(MQTT_SubscribeCBParam_T param)
{
   static char AppIncomingMsgTopicBuffer[256];/**< Incoming message topic buffer */

   static char AppIncomingMsgPayloadBuffer[256];/**< Incoming message payload buffer */

    AppIncomingMsgCount++;
    memset(AppIncomingMsgTopicBuffer, 0, sizeof(AppIncomingMsgTopicBuffer));
    memset(AppIncomingMsgPayloadBuffer, 0, sizeof(AppIncomingMsgPayloadBuffer));

    if (param.PayloadLength < (int) sizeof(AppIncomingMsgPayloadBuffer))
    {
        strncpy(AppIncomingMsgPayloadBuffer, (const char *) param.Payload, param.PayloadLength);
    }

    if (param.TopicLength < (int) sizeof(AppIncomingMsgTopicBuffer))
    {
        strncpy(AppIncomingMsgTopicBuffer, param.Topic, param.TopicLength);
    }

    printf("AppMQTTSubscribeCB : #%d, Incoming Message:\r\n"
            "\tTopic: %s\r\n"
            "\tPayload: \r\n\"\"\"\r\n%s\r\n\"\"\"\r\n", (int) AppIncomingMsgCount,
            AppIncomingMsgTopicBuffer, AppIncomingMsgPayloadBuffer);
}

Simple Full Code Example

Note: The full code example is intended for XDK-Workbench versions 3.4.0 and higher.

/*----------------------------------------------------------------------------*/

/* --------------------------------------------------------------------------- |
 * INCLUDES & DEFINES ******************************************************** |
 * -------------------------------------------------------------------------- */

/* own header files */
#include "XdkAppInfo.h"
#undef BCDS_MODULE_ID  /* Module ID define before including Basics package*/
#define BCDS_MODULE_ID XDK_APP_MODULE_ID_APP_CONTROLLER

/* own header files */
#include "AppController.h"

/* system header files */
#include <stdio.h>

/* additional interface header files */
#include "BCDS_BSP_Board.h"
#include "BCDS_CmdProcessor.h"
#include "BCDS_NetworkConfig.h"
#include "BCDS_Assert.h"
#include "XDK_WLAN.h"
#include "XDK_MQTT.h"
#include "XDK_Sensor.h"
#include "XDK_SNTP.h"
#include "XDK_ServalPAL.h"
#include "FreeRTOS.h"
#include "task.h"

/* local variables ********************************************************** */

static WLAN_Setup_T WLANSetupInfo =
        {
                .IsEnterprise = false,
                .IsHostPgmEnabled = false,
                .SSID = "LaCasa",
                .Password = "100RickMorty",
                .IsStatic = false,
        };/**< WLAN setup parameters */

static MQTT_Setup_T MqttSetupInfo =
        {
                .MqttType = MQTT_TYPE_SERVALSTACK,
                .IsSecure = false,
        };/**< MQTT setup parameters */

static MQTT_Connect_T MqttConnectInfo =
        {
                .ClientId = "XDK_FS",
                .BrokerURL = "broker.hivemq.com",
                .BrokerPort = 1883,
                .CleanSession = true,
                .KeepAliveInterval = 100,
        };/**< MQTT connect parameters */

static void AppMQTTSubscribeCB(MQTT_SubscribeCBParam_T param);

static MQTT_Subscribe_T MqttSubscribeInfo =
        {
                .Topic = "mqtt/xdk/topic",
                .QoS = 1UL,
                .IncomingPublishNotificationCB = AppMQTTSubscribeCB,
        };/**< MQTT subscribe parameters */

static MQTT_Publish_T MqttPublishInfo =
        {
                .Topic = "mqtt/xdk/topic",
                .QoS = 1UL,
                .Payload = NULL,
                .PayloadLength = 0UL,
        };/**< MQTT publish parameters */

static uint32_t AppIncomingMsgCount = 0UL;/**< Incoming message count */

static char AppIncomingMsgTopicBuffer[256];/**< Incoming message topic buffer */

static char AppIncomingMsgPayloadBuffer[256];/**< Incoming message payload buffer */

static CmdProcessor_T * AppCmdProcessor;/**< Handle to store the main Command processor handle to be used by run-time event driven threads */

static xTaskHandle AppControllerHandle = NULL;/**< OS thread handle for Application controller to be used by run-time blocking threads */

/* global variables ********************************************************* */

/* inline functions ********************************************************* */

/* local functions ********************************************************** */

static void AppMQTTSubscribeCB(MQTT_SubscribeCBParam_T param)
{
    AppIncomingMsgCount++;
    memset(AppIncomingMsgTopicBuffer, 0, sizeof(AppIncomingMsgTopicBuffer));
    memset(AppIncomingMsgPayloadBuffer, 0, sizeof(AppIncomingMsgPayloadBuffer));

    if (param.PayloadLength < (int) sizeof(AppIncomingMsgPayloadBuffer))
    {
        strncpy(AppIncomingMsgPayloadBuffer, (const char *) param.Payload, param.PayloadLength);
    }

    if (param.TopicLength < (int) sizeof(AppIncomingMsgTopicBuffer))
    {
        strncpy(AppIncomingMsgTopicBuffer, param.Topic, param.TopicLength);
    }

    printf("AppMQTTSubscribeCB : #%d, Incoming Message:\r\n"
            "\tTopic: %s\r\n"
            "\tPayload: \r\n\"\"\"\r\n%s\r\n\"\"\"\r\n", (int) AppIncomingMsgCount,
            AppIncomingMsgTopicBuffer, AppIncomingMsgPayloadBuffer);
}

/**
 * @brief Responsible for controlling the send data over MQTT application control flow.
 *
 * - Synchronize SNTP time stamp for the system if MQTT communication is secure
 * - Connect to MQTT broker
 * - Subscribe to MQTT topic
 * - Read environmental sensor data
 * - Publish data periodically for a MQTT topic
 *
 * @param[in] pvParameters
 * Unused
 */
static void AppControllerFire(void* pvParameters)
{
    BCDS_UNUSED(pvParameters);

    Retcode_T retcode = RETCODE_OK;

    const char * publishBuffer = "Hello World";
    MqttPublishInfo.Payload = publishBuffer;
    MqttPublishInfo.PayloadLength = strlen(publishBuffer);

    if (RETCODE_OK == retcode)
    {
        printf("MQTT Connection : Failed \r\n");
        retcode = MQTT_ConnectToBroker(&MqttConnectInfo, UINT32_C(60000));
    }

    if (RETCODE_OK == retcode)
    {
        printf("MQTT Subscription : Failed \r\n");
        retcode = MQTT_SubsribeToTopic(&MqttSubscribeInfo, UINT32_C(5000));
    }

    if (RETCODE_OK != retcode)
    {
        /* We raise error and still proceed to publish data periodically */
        Retcode_RaiseError(retcode);
    }

    /* A function that implements a task must not exit or attempt to return to
     its caller function as there is nothing to return to. */
    while (1)
    {
        MQTT_PublishToTopic(&MqttPublishInfo, UINT32_C(5000));
        vTaskDelay(UINT32_C(5000));
    }
}

/**
 * @brief To enable the necessary modules for the application
 * - WLAN
 * - ServalPAL
 * - SNTP (if secure communication)
 * - MQTT
 * - Sensor
 *
 * @param[in] param1
 * Unused
 *
 * @param[in] param2
 * Unused
 */
static void AppControllerEnable(void * param1, uint32_t param2)
{
    BCDS_UNUSED(param1);
    BCDS_UNUSED(param2);

    Retcode_T retcode = WLAN_Enable();
    if (RETCODE_OK == retcode)
    {
        retcode = ServalPAL_Enable();
    }

    if (RETCODE_OK == retcode)
    {
        if (pdPASS != xTaskCreate(AppControllerFire, (const char * const ) "AppController", TASK_STACK_SIZE_APP_CONTROLLER, NULL, TASK_PRIO_APP_CONTROLLER, &AppControllerHandle))
        {
            retcode = RETCODE(RETCODE_SEVERITY_ERROR, RETCODE_OUT_OF_RESOURCES);
        }
    }
    if (RETCODE_OK != retcode)
    {
        printf("AppControllerEnable : Failed \r\n");
        Retcode_RaiseError(retcode);
        assert(0); /* To provide LED indication for the user */
    }
}

/**
 * @brief To setup the necessary modules for the application
 * - WLAN
 * - ServalPAL
 * - SNTP (if secure communication)
 * - MQTT
 * - Sensor
 *
 * @param[in] param1
 * Unused
 *
 * @param[in] param2
 * Unused
 */
static void AppControllerSetup(void * param1, uint32_t param2)
{
    BCDS_UNUSED(param1);
    BCDS_UNUSED(param2);

    Retcode_T retcode = WLAN_Setup(&WLANSetupInfo);
    if (RETCODE_OK == retcode)
    {
        retcode = ServalPAL_Setup(AppCmdProcessor);
    }
    if (RETCODE_OK == retcode)
    {
        retcode = MQTT_Setup(&MqttSetupInfo);
    }
    if (RETCODE_OK == retcode)
    {
        retcode = CmdProcessor_Enqueue(AppCmdProcessor, AppControllerEnable, NULL, UINT32_C(0));
    }
    if (RETCODE_OK != retcode)
    {
        printf("AppControllerSetup : Failed \r\n");
        Retcode_RaiseError(retcode);
        assert(0); /* To provide LED indication for the user */
    }
}

/* global functions ********************************************************* */

/** Refer interface header for description */
void AppController_Init(void * cmdProcessorHandle, uint32_t param2)
{
    BCDS_UNUSED(param2);

    Retcode_T retcode = RETCODE_OK;

    if (cmdProcessorHandle == NULL)
    {
        printf("AppController_Init : Command processor handle is NULL \r\n");
        retcode = RETCODE(RETCODE_SEVERITY_ERROR, RETCODE_NULL_POINTER);
    }
    else
    {
        AppCmdProcessor = (CmdProcessor_T *) cmdProcessorHandle;
        retcode = CmdProcessor_Enqueue(AppCmdProcessor, AppControllerSetup, NULL, UINT32_C(0));
    }

    if (RETCODE_OK != retcode)
    {
        Retcode_RaiseError(retcode);
        assert(0); /* To provide LED indication for the user */
    }
}

Advanced API Setup and Overview

This guide presents a basic implementation using the MQTT API of the ServalStack. To make the API available to the application, the header file Serval_Mqtt.h has to be included in the implementation files. Additionally, a few other APIs are used in this guide. The following code shows how to include the header file used in this section.

// MQTT API
#include "Serval_Mqtt.h"

Additionally, in your project browse to

SDK > xdk110 > Libraries > ServalStack > 3rd-party > Serval Stack > api > Serval_Defines.h

And set the define of SERVAL_ENABLE_MQTT to 1, if this is not already done. If this is set to 0, the API will not be available in the application.

Some important constants and variables will be used throughout the guide. They are listed in the code below. The first two macros define the MQTT Host address and port respectively. The global variables for the session and a pointer to the session are relevant throughout the guide. The session must persist through the entire runtime of the MQTT application, as it will contain all the connection and server information, and is used by nearly every function of the MQTT API.

#define MQTT_BROKER_HOST "broker.hivemq.com"
#define MQTT_BROKER_PORT 1883
static MqttSession_T session;
static MqttSession_T *session_ptr = &session;

Finally, as MQTT is a communication protocol, a connection between the XDK and the broker has to be established. In this example, broker.hivemq.com is only accessible via the internet. This is why the XDK should connect via Wi-Fi to an access point with an internet connection. For more information on how to establish a network connection, please refer to the Wi-Fi section of the Knowledgebase

The following table lists some of the important functions of the MQTT API.

Function

Description

Mqtt_initialize()

This function is used to initialize the MQTT module and should be used before any other function of the MQTT API

Mqtt_initializeInternalSession()

This function is used to initialize the internal session. The session will hold all the information needed to connect to an MQTT broker

(*MqttEventCallback_T)()

This is a function that has to be implemented by the user of the API. On every MQTT Event, this function is called and will be used for handling the events. For more details, see chapter Event Handler of this section

Mqtt_connect()

This function attempts to connect to the target host. The target host is a field of the MqttSession_T and should be configured before this function is called.

Mqtt_publish()

This function publishes data. The inputs are: Topic (of type StringDescr_T), Payload ( + Length), Quality of Service, Retain Flag

Mqtt_subscribe()

This function subscribe to a list of topics with their respective QoS. The inputs are: List of Topics (of type StringDescr_T), List of QoS (of type Mqtt_qos_t)

All these functions, except Mqtt_initialize(), receive the pointer to a MqttSession_T variable as well.

Advanced Setup of XDK as MQTT Client

Assuming that the basic setup for using the API is done, and the XDK is connected to a Wi-Fi network, this chapter will focus on implementing an MQTT Client on the XDK.

After basic steps for initializing the XDK as a standard-conform MQTT, client the communication architecture will be built up between the XDK, and an external MQTT broker. Apart from connecting to an MQTT broker, this chapter will show how to perform basic data exchange in both ways of sending information to and receiving information from external communication partners. Learning how the communication works in the introduction chapter 1 and applying it in this chapter will enable the introduction of MQTT into real-life applications developed with the XDK.

Initialization

Before the MQTT API can be used, the MQTT module must first be initialized. The initialization of the MQTT client is accomplished in two steps. For this, a function similar to the one implemented in the code-snippet below can be used. First, the MQTT module itself is initialized by calling Mqtt_initialize(). Then, the variable session of type MqttSession_T is initialized. The declaration of this variable and its pointer is shown in chapter API Setup and Overview. It is crucial that this variable is global, since it will be used in nearly every function of the MQTT API.

retcode_t init(void){
  retcode_t rc_initialize = Mqtt_initialize();
  if (rc_initialize == RC_OK) {
    session_ptr = &session;
    Mqtt_initializeInternalSession(session_ptr);
  }
  return rc_initialize;
}

Event Handler

The current API is event-based. That means that the application reacts to every pre-defined event with a callback. Events can be Connection-events, incoming data, outgoing data, acknowledgements, and more. The code below shows the general structure of this event callback. The handler checks the event-type and acts accordingly.

retcode_t event_handler(MqttSession_T* session, MqttEvent_t event,
              const MqttEventData_t* eventData) {
  BCDS_UNUSED(session);
  switch(event){
    case MQTT_CONNECTION_ESTABLISHED:
      handle_connection(eventData->connect);
      // subscribing and publishing can now be done
      // subscribe();
      // publish();
      break;
    case MQTT_CONNECTION_ERROR:
      handle_connection(eventData->connect);
      break;
    case MQTT_INCOMING_PUBLISH:
      handle_incoming_publish(eventData->publish);
      break;
    case MQTT_SUBSCRIPTION_ACKNOWLEDGED:
      printf("Subscription Successful\n\r");
      break;
    case MQTT_PUBLISHED_DATA:
      printf("Publish Successful\n\r");
      break;
    default:
      printf("Unhandled MQTT Event: %d\n\r", event);
      break;
  }
  return RC_OK;
}

The type of the variable eventData depends on the event. For more information on this, refer to the header-file Serval_Mqtt.h. The events and the corresponding data types are listed there. The event MQTT_CONNECTION_ESTABLISHED holds special significance, because subscribing and publishing to topics can only be done after this event occurred.

The only events this guide handles are connection events and incoming publish events. There are prints for successful outgoing publish and subscription messages, as well. The functions in the code-snippets below are examples for how these events can be handled. For any other event, a new case has to be added, otherwise only the event’s number will be printed in the default case.

The function handle_connection() is used in both MQTT_CONNECTION_ESTABLISHED and MQTT_CONNECTION_ERROR, because these events have the same data-type. It simply prints the connect return code sent by the server, at which the connection attempt was directed.

static void handle_connection(MqttConnectionEstablishedEvent_T connectionData){
  int rc_connect = (int) connectionData.connectReturnCode;
  printf("Connection Event:\n\r"
          "\tServer Return Code: %d (0 for success)\n\r",
          (int) rc_connect);
}

The function, which handles incoming data from the topics to which the XDK subscribed, prints the message’s payload and the topic it has been published on. The payload and the topic do not have a string-terminating character “\0” , which is why they are first written into a buffer using the function snprintf() , where a null-terminating character is automatically added.

static void handle_incoming_publish(MqttPublishData_T publishData){
  int topic_length = publishData.topic.length + 1;
  int data_length = publishData.length + 1;
  char published_topic_buffer[topic_length];
  char published_data_buffer[data_length];
  snprintf(published_topic_buffer, topic_length, publishData.topic.start);
  snprintf(published_data_buffer, data_length,
  (char *) publishData.payload);
  printf("Incoming Published Message:\n\r"
          "\tTopic: %s\n\r"
          "\tPayload: %s\n\r",
          published_topic_buffer,
          published_data_buffer);
}

Configuring the Session

Now that the event handler is set up, the session has to be configured, to make a connection possible. The session holds the connect information, and thus has fields for all the connection parameters as listed in the table below.

MANDATORY:

Parameter

Description

clientID

The client identifier (here: clientID) is a identifier of each MQTT client connecting to a MQTT broker. It needs to be unique for the broker to know the state of the client. For a stateless connection it is also possible to send an empty clientID together with the attribute of clean session to be true.

cleanSession

The clean session flag indicates to the broker whether the client wants to establish a clean session or a persistent session where all subscriptions and messages (QoS 1 & 2) are stored for the client. A Quality of Service level (QoS) determines the guarantee of a message reaching the other client or the broker (e.g. 0 = fire and forget, 1 = at least once and 2 = exactly once).

keepAlive

The keep alive value is the time interval that the client commits to for when sending regular pings to the broker. The broker responds to the pings enabling both sides to determine if the other one is still alive and reachable

OPTIONAL:

Parameter

Description

username/password

MQTT allows to send a username and password for authenticating the client. However the password is sent in plaintext if it isn’t encrypted or hashed by implementation or TLS is used underneath.

lastWill-Topic-QoS/-Message/-Retain

The last will message allows the broker to notify other clients when a client disconnects from the broker unexpectedly. Therefore the MQTT topic, QoS, message and retain flag can be sent to the broker during the client connect request.

Additionally, this variable holds information about the target host. The target host is the broker the XDK will connect to. The codes in this chapter show a minimal configuration necessary for a valid connection. Finally, the session also gets a reference to the event handler.

The first part is setting the target, which is demonstrated in the code below. The target of the session variable is of type SupportedUrl_T, which is why SupportedUrl_fromString() is used to set the target-information. This function receives a full URL, such as “mqtt://255.255.255.255:1883/some/path” and parses the necessary information.

void config_set_target(void){
  static char mqtt_broker[64];
  const char *mqtt_broker_format = "mqtt://%s:%d";
  char server_ip_buffer[13];
  Ip_Address_T ip;

  PAL_getIpaddress((uint8_t *) MQTT_BROKER_HOST, &ip);
  Ip_convertAddrToString(&ip, server_ip_buffer);
  sprintf(mqtt_broker, mqtt_broker_format,
              server_ip_buffer, MQTT_BROKER_PORT);

  SupportedUrl_fromString(mqtt_broker,
              (uint16_t) strlen(mqtt_broker), &session_ptr->target);
}

Since the host-address used in this guide is “broker.hivemq.com”, the address is first resolved using the function PAL_getIpaddress() and the resulting IP-address is converted to a string (this function also works if the address is an IP-address). The IP-address-string and the port will then be inserted into the mqtt_broker buffer.

Finally, the string mqtt_broker can be used in SupportedUrl_fromString(), along with the buffer’s length and a pointer to the session’s target field. The buffer has to be static since the string will not be copied by the function.

Setting the connect data is relatively straight forward. MQTTVersion has to be set to 3 for the protocol’s version 3.1. If the protocol version 3.1.1 should be used instead, set this field to 4. cleanSession and keepAliveInterval can be set to suit the use case. If will.haveWill is set to true instead of false, the other fields of will have to be set as well. The clientID is arbitrary in this case. The following code shows example values for each field.

void config_set_connect_data(void){
  static char *device_name = "XDK110_Guide_Device";
  session_ptr->MQTTVersion = 3;
  session_ptr->keepAliveInterval = 100;
  session_ptr->cleanSession = true;
  session_ptr->will.haveWill = false;

  StringDescr_T device_name_descr;
  StringDescr_wrap(&device_name_descr, device_name);
  session_ptr->clientID = device_name_descr;
}

Finally, the session’s event handler has to be set, as seen in the following code.

void config_set_event_handler(void){
  session_ptr->onMqttEvent = event_handler;
}

Connecting To MQTT Broker

Establishing a connection is straight-forward, since the target host and the connect details are already configured in chapter Configuring the Session. To connect, a function such as seen in the code snippet below can be used. It is important to handle possible errors. One of the errors the function Mqtt_connect() throws, is when the XDK cannot find the target host. This can be either because the host does not exist, or if the XDK cannot access the host at all.

retcode_t connect(void){
  retcode_t rc = RC_INVALID_STATUS;
  rc = Mqtt_connect(session_ptr);
  if (rc != RC_OK) {
    printf("Could not connect, error 0x%04x\n", rc);
  }
  return rc;
}

It is also crucial to understand that the errors of Mqtt_connect() are semantically different from the event MQTT_CONNECTION_ERROR . The function throws client-related errors, the event throws server-related errors, such as a connection refusal.

Subscribing to a Topic

As soon as the event MQTT_CONNECTION_ESTABLISHED occurs, it is possible to subscribe to a topic on the MQTT Server using Mqtt_subscribe(). The following table shows the general subscription parameters of MQTT.

Parameter

Description

packetID

The packet identifier (here: packetID) is a unique identifier set between client and broker to identify a message request/ack in a message flow (only relevant for QoS 1 & 2).

listTopics ( qos1, topic1, qos2, topic2, Е)

The subscribe message can contain a list subscriptions with a pair of topic and QoS level each. The topic in the subscribe message can also contain wildcards, which makes it possible to subscribe to certain topic patterns.

The function Mqtt_subscribe() requires an array of topics and the corresponding Qualities of Service in another array. The first array is of type StringDescr_T, which is why each topic has to be wrapped into a StringDescr_T by using the function StringDescr_wrap. The array of QoS is of type Mqtt_qos_t, but the possible values are essentially numbers. The possible values are listed at the type definition of Mqtt_qos_t in Serval_Mqtt.h.

The code below shows a function that subscribes to a topic. The variables subscribe_topic and subscription_topics should keep their values until a MQTT_SUBSCRIPTION_ACKNOWLEDGED event has occured. This is why the variables are declared static in this context, since the event is not handled by the callback that has been implemented in chapter Event Handler.

static void subscribe(void){
  static char *sub_topic = "your/subscribe/topic";
  static StringDescr_T subscription_topics[1];
  static Mqtt_qos_t qos[1];
  StringDescr_wrap(&(subscription_topics[0]), sub_topic);
  qos[0] = MQTT_QOS_AT_MOST_ONE;
  Mqtt_subscribe(session_ptr, 1, subscription_topics, qos);
}

Of course, the number of topics can be greater than one, but this example only subscribes to one topic. If more topics are added, each individual topic requires a corresponding QoS.

Publishing on a Topic

As soon as the event MQTT_CONNECTION_ESTABLISHED occurs, it is possible to publish on a topic on the MQTT Server to which the XDK is connected. The following table shows the general publishing parameters of MQTT.

Parameter

Description

packetID

The packet identifier (here: packetID) is a unique identifier set between client and broker to identify a message request/ack in a message flow (only relevant for QoS 1 & 2).

topicName

A simple string which is hierarchically structured with forward slashes “/” as delimiters (e.g. “home/livingroom/temperature” or “Germany/Munich/Octoberfest/people”).

qos

A Quality of Service level (QoS) determines the guarantee of a message reaching the other client or the broker (e.g. 0 = fire and forget, 1 = at least once and 2 = exactly once)

retainFlag

The retain flag determines if the message will be saved by the broker for the specified topic as last known good value. New clients that subscribe to that topic will receive the last retained message on that topic instantly after subscribing.

payload

This is the actual content of the message. MQTT is data-agnostic and it totally depends on the use case how the payload is structured. It’s completely up to the sender if it wants to send binary data, textual data or even XML or JSON.

dupFlag

The duplicate flag is used on QoS levels 1 & 2 and indicates whether this message is a duplicate and is resent because the other end didn’t acknowledge the original message.

The function Mqtt_publish() requires all of these parameters, except the packetID and the duplicate Flag.

The topic is wrapped in a variable of type StringDescr_T, the payload is an ordinary array of type char and its length. The qos is of type Mqtt_qos_t. The values for the qos can have are listed in the typedef of Mqtt_qos_t in Serval_Mqtt.h. The retain flag is either true or false.

static void publish(void){
  static char *pub_message = "Hello World";
  static char *pub_topic = "your/publish/topic";
  static StringDescr_T pub_topic_descr;
  StringDescr_wrap(&pub_topic_descr, pub_topic);

  Mqtt_publish(session_ptr, pub_topic_descr, pub_message,
                strlen(pub_message), MQTT_QOS_AT_MOST_ONE, false);
}

As a recommendation, an application based on the MQTT API would have a flow of event-handling that is similar to the graph in the picture below.

The first step is to start up the application, which consists of initializing the module and internal session, configuring the session and then connecting. Alternatively, whenever the connection closes, a reconnect may be attempted.

For Publishing, it is recommended to first load the data into a buffer and then publish it. The buffer should not be changed until the event MQTT_PUBLISHED_DATA is fired, but afterwards, the data in the buffer may be released.

The same holds for Subscribing. The topic data should not be released until the event MQTT_SUBSCRIPTION_ACKNOWLEDGED has occurred. Otherwise, the topic to which the XDK subscribed might be an arbitrary one in the worst case.

Image