Message Publishing & Subscribing

Publishing and subscribing to messages is a useful thing to be able to do, for example you might create a project which would record temperatures locally and then want to unify them centrally.

To achive this each sensor would connect to a central message-bus/message-queue and say "Hey I'm here and the temperature is 30°". An observer, which wished to unify those messages, or process them, could then listen for those updates and take out the appropriate action.

The Mosquitto MQTT Broker makes that simple:

  • Clients can connect and publish "messages" on "topics".
  • Other hosts can connect and subscribe to all the messages that appear on a given topic.

Messages are free-form strings, and topics are arbitrary labels. In my personal network, for example, I have a few nodes that send updates on the topic "meta". For example:

{"hostname":"D1-TEMP",
 "ip":"192.168.10.33",
 "id":"02863F",
 "mac":"A0:20:A6:02:86:3F"}

I could write a script to listen to these events and output something human-readable:

  • ..
  • Device with MAC A0:20:A6:02:86:3F was booted.
  • ..

Mosquitto Setup

I'm running Debian's stable release, so installing the server is as simple as:

# apt-get install mosquitto mosquitto-clients

Once installed create a configuration-file /etc/mosquitto/mosquitto.conf with contents like this:

listener 1883
persistence true
persistence_location /var/lib/mosquitto/
persistence_file mosquitto.db
log_dest syslog
log_dest stdout
log_dest topic
log_type error
log_type warning
log_type notice
log_type information
connection_messages true
log_timestamp true
allow_anonymous true

That will allow anybody on your local LAN to connect, publish, and subscribe. You might wish to examine the use of access-control or SSL if you're not in a trusted environment.

Once you've started the server you'll then be able to experiment with publishing and subscribing to messages.

As a simple example this will print all messages that arrive in the topic "news":

$ mosquitto_sub -h 127.0.0.1 -p 1883 -t news

Leave that running, and in a different window run:

$ mosquitto_pub -h 127.0.0.1 -p 1883 -t news -m "Hello I am alive"

That's the core of a message-queue, or message-bus. The thing to remember is that single host could be sending messages to the topic news or fifty might. It wouldn't matter. Every time a message is posted on a given topic the queue will take care of transmitting it to every client that is subscribed to that topic.

The following might be a useful command, it dumps meta-information about the server:

$ mosquitto_sub -v -t \$SYS/#

Here "#" is used for a wildcard, you can also use "+" for all messages.

Perl Interface

If you want to interface with the server programatically, via Perl, you can use the Net::MQTT::Simple library.

The following brief program shows:

  • Connecting to a queue.
  • Listening for all messages, and printing them as they arrive.
  • Publishing a simple "we've connected" message on the topic "meta", on startup.
#!/usr/bin/perl

use strict;
use warnings;

use Net::MQTT::Simple;

my $mqtt = Net::MQTT::Simple->new("localhost");
$mqtt->publish("meta" => "We're connected");

$mqtt->run(
        "+" => sub {
            my ($topic, $message) = @_;
            print "Topic: $topic - $message\n";
        },
    );

ESP8266 Usage

The ESP8266 pubsubclient library allows you to interact with the message-bus, both publishing messages and responding to incoming ones.

You'll need to setup the connection to your server, and once you've done that you can listen to messages on topics (subscribe to "+" to subscribe to everything).

Basic usage is contained in the examples, but in brief:


#include <ESP8266WiFi.h>
#include <PubSubClient.h>


WiFiClient espClient;
PubSubClient client(espClient);



//
// Called when a message is received to a subscribed topic.
//
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

//
// Reconnect to the message-bus if the connection died, or we're
// not otherwise connected.
//
void reconnect() {

  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");

    // Create a random client ID
    String clientId = "ESP8266Client-";
    clientId += String(random(0xffff), HEX);

    // Attempt to connect
    if (client.connect(clientId.c_str())) {
      Serial.println("connected to MQTT server");

      // Once connected, publish an announcement...
      client.publish("meta", "We're connected");

      // subscribe to all topics
      client.subscribe("+");

      // or just one.
      // client.subscribe("news");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}


void setup()
{
   ..

  client.setServer("192.168.10.64", 1883);
  client.setCallback(callback);
}

void loop()
{

  // Connect if we're not already connected.
  if (!client.connected())
    reconnect();

  // process any events.
  client.loop();

  ...
}