Publishing and subscribing to messages is a useful thing to be able to do, for example you might create a project which would record temperatures locally and then want to unify them centrally.
To achive this each sensor would connect to a central message-bus/message-queue and say "Hey I'm here and the temperature is 30°". An observer, which wished to unify those messages, or process them, could then listen for those updates and take out the appropriate action.
The Mosquitto MQTT Broker makes that simple:
- Clients can connect and publish "messages" on "topics".
- Other hosts can connect and subscribe to all the messages that appear on a given topic.
Messages are free-form strings, and topics are arbitrary labels. In my personal network, for example, I have a few nodes that send updates on the topic "meta". For example:
{"hostname":"D1-TEMP", "ip":"192.168.10.33", "id":"02863F", "mac":"A0:20:A6:02:86:3F"}I could write a script to listen to these events and output something human-readable:
- ..
- Device with MAC A0:20:A6:02:86:3F was booted.
- ..
I'm running Debian's stable release, so installing the server is as simple as:
# apt-get install mosquitto mosquitto-clientsOnce installed create a configuration-file /etc/mosquitto/mosquitto.conf with contents like this:
listener 1883 persistence true persistence_location /var/lib/mosquitto/ persistence_file mosquitto.db log_dest syslog log_dest stdout log_dest topic log_type error log_type warning log_type notice log_type information connection_messages true log_timestamp true allow_anonymous trueThat will allow anybody on your local LAN to connect, publish, and subscribe. You might wish to examine the use of access-control or SSL if you're not in a trusted environment.
Once you've started the server you'll then be able to experiment with publishing and subscribing to messages.
As a simple example this will print all messages that arrive in the topic "news":
$ mosquitto_sub -h 127.0.0.1 -p 1883 -t newsLeave that running, and in a different window run:
$ mosquitto_pub -h 127.0.0.1 -p 1883 -t news -m "Hello I am alive"That's the core of a message-queue, or message-bus. The thing to remember is that single host could be sending messages to the topic
news
or fifty might. It wouldn't matter. Every time a message is posted on a given topic the queue will take care of transmitting it to every client that is subscribed to that topic.The following might be a useful command, it dumps meta-information about the server:
$ mosquitto_sub -v -t \$SYS/#Here "
#
" is used for a wildcard, you can also use "+
" for all messages.
If you want to interface with the server programatically, via Perl, you can use the Net::MQTT::Simple library.
The following brief program shows:
- Connecting to a queue.
- Listening for all messages, and printing them as they arrive.
- Publishing a simple "we've connected" message on the topic "meta", on startup.
#!/usr/bin/perl use strict; use warnings; use Net::MQTT::Simple; my $mqtt = Net::MQTT::Simple->new("localhost"); $mqtt->publish("meta" => "We're connected"); $mqtt->run( "+" => sub { my ($topic, $message) = @_; print "Topic: $topic - $message\n"; }, );
The ESP8266 pubsubclient library allows you to interact with the message-bus, both publishing messages and responding to incoming ones.
You'll need to setup the connection to your server, and once you've done that you can listen to messages on topics (subscribe to "+" to subscribe to everything).
Basic usage is contained in the examples, but in brief:
#include <ESP8266WiFi.h> #include <PubSubClient.h> WiFiClient espClient; PubSubClient client(espClient); // // Called when a message is received to a subscribed topic. // void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); } // // Reconnect to the message-bus if the connection died, or we're // not otherwise connected. // void reconnect() { // Loop until we're reconnected while (!client.connected()) { Serial.print("Attempting MQTT connection..."); // Create a random client ID String clientId = "ESP8266Client-"; clientId += String(random(0xffff), HEX); // Attempt to connect if (client.connect(clientId.c_str())) { Serial.println("connected to MQTT server"); // Once connected, publish an announcement... client.publish("meta", "We're connected"); // subscribe to all topics client.subscribe("+"); // or just one. // client.subscribe("news"); } else { Serial.print("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void setup() { .. client.setServer("192.168.10.64", 1883); client.setCallback(callback); } void loop() { // Connect if we're not already connected. if (!client.connected()) reconnect(); // process any events. client.loop(); ... }