MQTT Broker-Client

By | February 20, 2022

In this article I will, using a test-case, show how to publish and consume MQTT messages using the reactive HiveMQ client library. I will use the Mosquitto MQTT broker for the test. The example application will be implemented using Spring Boot 3 and Java 17. The example will initially allow unauthenticated access that later will be upgraded to password-based authentication.
The completed example project is available on GitHub.

Prerequisites

The test will use Testcontainers to run the Mosquitto broker in a Docker container. Thus Docker is required.

Create the Project

I used the Spring Initializr web page to create the skeleton of the example project and you can download the project using this link. Note that there may be a more recent version of Spring Boot 3, in which case I suggest you use it.

Additional Dependencies

After having opened the project in your favourite IDE, update the pom.xml file to look like this:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.0-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>se.ivankrizsan.java</groupId>
    <artifactId>mqtt-example</artifactId>
    <version>1.0.0-SNAPSHOT</version>

    <name>mqtt-example</name>
    <description>MQTT Example</description>

    <properties>
        <java.version>17</java.version>
        <testcontainers.version>1.16.2</testcontainers.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-mqtt-client-reactor</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>testcontainers-bom</artifactId>
                <version>${testcontainers.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</enabled>
            </releases>
        </pluginRepository>
    </pluginRepositories>
</project>

If you used another version of Spring Boot 3 when creating the project, replace the version in the above pom.xml file with the version you use.

Logback Configuration

In src/test/resources, create a file named “logback-test.xml” with the following contents:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <include resource="org/springframework/boot/logging/logback/console-appender.xml" />

    <logger name="se.ivankrizsan" level="DEBUG"/>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

MQTT Tests Base Class

Code that can be reused by multiple tests has been located to an abstract base class, MqttTestsAbstractBase, which looks like this:

package se.ivankrizsan.java.mqtt;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class containing common code used in MQTT-related tests.
 * Can be configured to start an MQTT broker prior to tests being executed and stop
 * it after all the tests in the test-class has been executed.
 *
 * @author Ivan Krizsan
 */
public abstract class MqttTestsAbstractBase {
    /* Constant(s): */
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttTestsAbstractBase.class);
    public static final String TOPIC = "test_mqtt_topic";
    /** Flag indicating whether to use an external MQTT broker or start one using Testcontainers. */
    public static final boolean RUN_MQTT_BROKER_IN_TESTCONTAINER = true;
    public static final String MOSQUITTO_BROKER_DOCKER_IMAGE = "eclipse-mosquitto:2.0.14";
    /** Mosquitto broker configuration used in test broker. */
    public static final String MOSQUITTO_BROKER_CONFIG = "src/test/resources/mosquitto.conf";
    /** Mosquitto broker configuration path in container. */
    protected static final String MOSQUITTO_BROKER_CONFIG_CONTAINER_PATH =
        "/mosquitto/config/mosquitto.conf";
    /** Mosquitto broker password file used in test broker. */
    protected static final String MOSQUITTO_BROKER_PASSWORDFILE =
        "src/test/resources/mosquitto_passwordfile";
    /** Mosquitto broker password file path in container. */
    protected static final String MOSQUITTO_BROKER_PASSWORDFILE_CONTAINER_PATH =
        "/mosquitto/config/mosquitto_passwordfile";
    /** User present in Mosquitto broker password file. */
    protected static final String MOSQUITTO_USERNAME = "testuser";
    /** Password of user present in Mosquitto broker password file. */
    protected static final String MOSQUITTO_PASSWORD = "secret";
    protected static final long MAX_RECONNECT_DELAY_MILLISEC = 10000;
    /** External MQTT broker host. */
    public static final String EXTERNAL_MQTT_BROKER_HOST = "localhost";
    /** External MQTT broker port. */
    public static final int EXTERNAL_MQTT_BROKER_PORT = 1883;
    public static final int DISCONNECT_MQTT_CLIENT_TIMEOUT_MILLISEC = 1000;

    /* Class variable(s): */
    /** Testcontainers container in which the Mosquitto broker used during the tests is running. */
    protected static GenericContainer sMqttBrokerContainer;

    /* Instance variable(s): */
    protected String mMqttBrokerHost;
    protected Integer mMqttBrokerPort;
    protected Mqtt5ReactorClient mMqttPublisher;
    protected String mPublisherId;
    protected Mqtt5ReactorClient mMqttSubscriber;
    protected String mSubscriberId;

    /**
     * Starts a Mosquitto MQTT broker in a container if the test is configured to
     * use a Testcontainers MQTT broker.
     * The MQTT broker will be started once prior to the execution of all tests.
     */
    @BeforeAll
    public static void startMqttBrokerTestContainer() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            sMqttBrokerContainer =
                new GenericContainer<>(MOSQUITTO_BROKER_DOCKER_IMAGE)
                    .withExposedPorts(1883, 9001)
                    .waitingFor(Wait.forLogMessage(".*mosquitto.*", 1));

            sMqttBrokerContainer.start();
            LOGGER.info("Mosquitto broker started");

            /*
             * Add a log consumer to the Testcontainers container as to have the logs
             * from the Mosquitto container output to the test's logger.
             */
            final Slf4jLogConsumer theMosquittoBrokerLogConsumer =
                new Slf4jLogConsumer(LOGGER).withSeparateOutputStreams();
            sMqttBrokerContainer.followOutput(theMosquittoBrokerLogConsumer);
        }
    }

    /**
     * Stops the Mosquitto MQTT broker in a container if the test after all tests
     * if the broker has been run in a Testcontainers container.
     */
    @AfterAll
    public static void stopMqttBrokerTestContainer() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            sMqttBrokerContainer.stop();
        }
    }

    /**
     * Performs set-up before each test by creating two MQTT clients that will connect
     * to the test MQTT broker.
     */
    @BeforeEach
    public void setUpBeforeTest() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            mMqttBrokerHost = sMqttBrokerContainer.getHost();
            mMqttBrokerPort = sMqttBrokerContainer.getFirstMappedPort();
        } else {
            /* If using an externally started MQTT broker, assume that it is running on localhost:1883. */
            mMqttBrokerHost = EXTERNAL_MQTT_BROKER_HOST;
            mMqttBrokerPort = EXTERNAL_MQTT_BROKER_PORT;
        }
        LOGGER.info("Using MQTT broker running on host '{}' and port {}",
            mMqttBrokerHost,
            mMqttBrokerPort);

        /* Create an MQTT client used for publishing messages. */
        mPublisherId = UUID.randomUUID().toString();
        final Mqtt5Client theRxMqttPublisherClient = MqttClient.builder()
            .useMqttVersion5()
            .identifier(mPublisherId)
            .serverHost(mMqttBrokerHost)
            .serverPort(mMqttBrokerPort)
            .automaticReconnect()
            .maxDelay(MAX_RECONNECT_DELAY_MILLISEC, TimeUnit.MILLISECONDS)
            .applyAutomaticReconnect()
            .buildRx();
        mMqttPublisher = Mqtt5ReactorClient.from(theRxMqttPublisherClient);

        /* Create an MQTT client used for subscribing. */
        mSubscriberId = UUID.randomUUID().toString();
        final Mqtt5Client theRxMqttSubscriberClient = MqttClient.builder()
            .useMqttVersion5()
            .identifier(mSubscriberId)
            .serverHost(mMqttBrokerHost)
            .serverPort(mMqttBrokerPort)
            .automaticReconnect()
            .maxDelay(MAX_RECONNECT_DELAY_MILLISEC, TimeUnit.MILLISECONDS)
            .applyAutomaticReconnect()
            .buildRx();
        mMqttSubscriber = Mqtt5ReactorClient.from(theRxMqttSubscriberClient);
    }

    /**
     * Performs clean-up after each test by disconnecting the MQTT clients.
     */
    @AfterEach
    public void disconnectMqttClients() {
        if (mMqttSubscriber.getState().isConnected()) {
            mMqttSubscriber
                .disconnectWith()
                .reasonCode(Mqtt5DisconnectReasonCode.NORMAL_DISCONNECTION)
                .applyDisconnect()
                .block(Duration.ofMillis(DISCONNECT_MQTT_CLIENT_TIMEOUT_MILLISEC));
        }
        if (mMqttPublisher.getState().isConnected()) {
            mMqttPublisher
                .disconnectWith()
                .reasonCode(Mqtt5DisconnectReasonCode.NORMAL_DISCONNECTION)
                .applyDisconnect()
                .block(Duration.ofMillis(DISCONNECT_MQTT_CLIENT_TIMEOUT_MILLISEC));
        }
    }

    /**
     * Connects the publisher and logs any acknowledge received as result.
     */
    protected void connectPublisher() {
        final Mqtt5ConnAck theConnectionAck = mMqttPublisher
            .connectWith()
            .applyConnect()
            .block(Duration.ofMillis(1000));

        if (theConnectionAck != null) {
            final Mqtt5ConnAckReasonCode theConnectionAckReasonCode =
                theConnectionAck.getReasonCode();
            LOGGER.info("Connecting publisher received ACK code: {}", theConnectionAckReasonCode);
        }
    }

    /**
     * Connects the subscriber and logs any acknowledge received as result.
     */
    protected void connectSubscriber() {
        final Mqtt5ConnAck theConnectionAck = mMqttSubscriber
            .connectWith()
            .applyConnect()
            .block(Duration.ofMillis(1000));

        if (theConnectionAck != null) {
            final Mqtt5ConnAckReasonCode theConnectionAckReasonCode =
                theConnectionAck.getReasonCode();
            LOGGER.info("Connecting subscriber received ACK code: {}", theConnectionAckReasonCode);
        }
    }

    /**
     * Runs the supplied runnable in a new thread.
     *
     * @param inRunnable Runnable to run in a separate thread.
     * @return New thread.
     */
    protected Thread runInNewThread(final Runnable inRunnable) {
        final Thread theThread = new Thread(inRunnable);
        theThread.start();
        return theThread;
    }
}

Note that:

  • There is a constant name RUN_MQTT_BROKER_IN_TESTCONTAINER.
    When set to true, this class will use Testcontainers to start a MQTT broker in a Docker container before any tests are executed (the startMqttBrokerTestContainer method) and shuts it down (the stopMqttBrokerTestContainer method) after all tests have been executed.
    If set to false, an external MQTT broker will be used.
  • There is a constant named MOSQUITTO_BROKER_DOCKER_IMAGE.
    The value of this constant specifies which Docker image to use when launching a MQTT broker for the tests using Testcontainers.
  • There are two constants EXTERNAL_MQTT_BROKER_HOST and EXTERNAL_MQTT_BROKER_PORT.
    These two constants specify host and port to use when connecting to an external MQTT broker.
  • The method setUpBeforeTest creates two clients immediately before each test.
    Both the clients connect to the same broker but the first one is for publishing and the second one is for subscribing.
  • In the setUpBeforeTest method, there is no authentication configuration for neither the publishing nor the subscribing client.
    The absence is of course not easy to discern – we will se what this configuration look like later.
  • The disconnectMqttClients method will be executed after each test.
    If connected, the subscriber and publisher will be disconnected.
  • The connectPublisher method connects the publisher and logs any acknowledge code received in response.
  • Finally, the connectSubscriber connects the subscriber and logs any acknowledge code received in response.

Reactive Client Test

In this example, there is only one subclass to the MQTT test base class; a test class that uses the reactive HiveMQTT client. The class, named ReactiveMqttClientTests, looks like this:

package se.ivankrizsan.java.mqtt;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRec;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
import java.util.Optional;

/**
 * Tests showing interactions with a MQTT broker using the reactive HiveMQTT client.
 *
 * @author Ivan Krizsan
 */
public class ReactiveMqttClientTests extends MqttTestsAbstractBase {
    /* Constant(s): */
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMqttClientTests.class);
    protected static final int MESSAGE_COUNT = 10;

    /* Instance variable(s): */

    /**
     * Tests sending and receiving a number of messages to a MQTT topic.
     * Expected result:
     * The publisher should be able to successfully publish a number of messages to the topic.
     * The consumer should be able to successfully consume the same number of messages
     * from the topic.
     */
    @Test
    public void sendAndReceiveMessagesSuccessfullyTest() {
        connectPublisher();
        connectSubscriber();

        /* Subscribe to the topic. */
        final Mono<Mqtt5SubAck> theSubscriptionAckMono = mMqttSubscriber
            .subscribeWith()
            .topicFilter(TOPIC)
            .qos(MqttQos.EXACTLY_ONCE)
            .applySubscribe();

        /* Verify that the subscription should have been accepted with minimum QoS 2. */
        StepVerifier
            .create(theSubscriptionAckMono.log())
            .expectNextMatches(theSubscriptionAck -> {
                final Mqtt5SubAckReasonCode theSubscribeAckReasonCode = theSubscriptionAck.getReasonCodes().get(0);
                LOGGER.info("*** Successfully subscribed to topic " + TOPIC
                    + " with subscription acknowledge reason code " + theSubscribeAckReasonCode);
                return Mqtt5SubAckReasonCode.GRANTED_QOS_2 == theSubscribeAckReasonCode;
            })
            .expectComplete()
            .verify();

        /*
         * Create a message consumer flux that consumes all messages from a topic.
         * Must create a message consumer prior to publishing messages that are to be consumed.
         */
        final Flux<Mqtt5Publish> theMessageConsumer =
            mMqttSubscriber
                .publishes(MqttGlobalPublishFilter.ALL, true)
                .map(theMqtt5Publish -> {
                    logPublish(theMqtt5Publish);
                    return theMqtt5Publish;
                });

        /* Consume messages from the topic. */
        runInNewThread(() -> {
            StepVerifier
                .create(theMessageConsumer)
                .expectNextCount(MESSAGE_COUNT)
                .expectComplete()
                .verify();
            LOGGER.info("*** Consumed all messages from topic " + TOPIC);
        });

        /* Publish messages. */
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            final String thePayloadString = "Time: " + LocalTime.now();
            final Mqtt5Publish thePublishMsg = Mqtt5Publish
                .builder()
                .topic(TOPIC)
                .qos(MqttQos.EXACTLY_ONCE)
                .payload(thePayloadString.getBytes(StandardCharsets.UTF_8))
                .retain(false)
                .build();
            mMqttPublisher
                .publish(theSubscriber -> theSubscriber.onNext(thePublishMsg))
                .map(thePublishResult -> {
                    /* Just log the publishing results. */
                    logMqttQos2Result((Mqtt5PublishResult.Mqtt5Qos2Result) thePublishResult);
                    return thePublishResult;
                })
                .subscribe();
        }
    }

    /**
     * Logs the supplied result of a MQTT 5 publish message with QoS 2.
     *
     * @param inMqtt5PublishQos2Result Publish result to log.
     */
    protected void logMqttQos2Result(final Mqtt5PublishResult.Mqtt5Qos2Result inMqtt5PublishQos2Result) {
        final Mqtt5PubRec thePubRec =
            inMqtt5PublishQos2Result.getPubRec();
        LOGGER.info("*** Publish result received: {}", thePubRec);
    }

    /**
     * Logs the supplied MQTT 5 publish message.
     *
     * @param inMqtt5Publish Publish message to log.
     */
    protected void logPublish(final Mqtt5Publish inMqtt5Publish) {
        final MqttTopic theReceivedMsgTopic = inMqtt5Publish.getTopic();
        final MqttQos theReceivedMsgQos = inMqtt5Publish.getQos();
        final Optional<ByteBuffer> theReceivedMsgPayloadOptional = inMqtt5Publish.getPayload();
        final String thePayloadString = theReceivedMsgPayloadOptional.isEmpty()
                ? "[n/a]"
                : StandardCharsets.UTF_8.decode(theReceivedMsgPayloadOptional.get()).toString();
        LOGGER.info("*** Received a message from the topic {} with QoS {} and payload '{}'",
                theReceivedMsgTopic,
                theReceivedMsgQos.name(),
                thePayloadString);
    }
}

Note that:

  • The class contains one single test method – sendAndReceiveMessagesSuccessfullyTest.
  • The test method calls the connectPublisher and connectSubscriber methods implemented in the superclass to, as indicated by the names, connect the publisher and the subscriber.
  • After having connected, the subscriber subscribes to the test-topic.
    The subscriber requests QoS (quality of service) so that each message is delivered exactly once which in MQTT is QoS2 and also the highest QoS level.
  • The subscription request is completed by calling the applySubscribe method which returns a mono.
  • Having received a subscription acknowledge the mono will produce an element of the type Mqtt5SubAck if the subscription was successful.
    In the Mqtt5SubAck there will be a code which indicates the granted QoS level.
  • The Reactor test utility StepVerifier is used to verify the outcome of the subscriber’s subscription request.
  • A message-consumer flux is created using the subscriber.
    In addition all messages received through the flux will be logged.
    It is important to subscribe prior to messages being published or else the subscriber will fail to receive messages having been published prior to subscribing, that is messages will be removed from the topic despite not having been consumed by a subscriber.
  • Having created the message-consumer flux, a StepVerifier is used to consume the messages from it.
    Note that consuming the messages need to be done in a new thread since the thread executing the test will be used to publish messages to the topic.
  • The final part of the test method publishes messages to the topic and logs the result of publishing each message.
  • The two remaining methods, logMqttQos2Result and logPublish, logs two different types of MQTT messages.

First Run

With the above preparations in place, let’s make a first attempt at running the test in the ReactiveMqttClientTests class. Do remember that Testcontainers is used to start an MQTT broker in a container and thus Docker needs to be started.

When run, the test fails with the following logged to the console:

2022-02-12 20:17:31.130  INFO   --- [           main] o.t.d.DockerClientProviderStrategy       : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2022-02-12 20:17:31.581  INFO   --- [           main] o.t.d.DockerClientProviderStrategy       : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2022-02-12 20:17:31.582  INFO   --- [           main] org.testcontainers.DockerClientFactory   : Docker host IP address is localhost
2022-02-12 20:17:31.625  INFO   --- [           main] org.testcontainers.DockerClientFactory   : Connected to docker: 
  Server Version: 19.03.13
  API Version: 1.40
  Operating System: Docker Desktop
  Total Memory: 18005 MB
2022-02-12 20:17:31.629  INFO   --- [           main] o.t.utility.ImageNameSubstitutor         : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2022-02-12 20:17:31.806  INFO   --- [           main] o.t.utility.RegistryAuthLocator          : Credential helper/store (docker-credential-desktop) does not have credentials for index.docker.io
2022-02-12 20:17:32.407  INFO   --- [           main] org.testcontainers.DockerClientFactory   : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2022-02-12 20:17:32.407  INFO   --- [           main] org.testcontainers.DockerClientFactory   : Checking the system...
2022-02-12 20:17:32.408  INFO   --- [           main] org.testcontainers.DockerClientFactory   : ✔︎ Docker server version should be at least 1.6.0
2022-02-12 20:17:32.545  INFO   --- [           main] org.testcontainers.DockerClientFactory   : ✔︎ Docker environment should have more than 2GB free disk space
2022-02-12 20:17:32.558  INFO   --- [           main] ? [eclipse-mosquitto:2.0.14]            : Creating container for image: eclipse-mosquitto:2.0.14
2022-02-12 20:17:32.616  INFO   --- [           main] ? [eclipse-mosquitto:2.0.14]            : Starting container with ID: 6eb212d9992642d7b6957eac116280b76cff36cc4ad3f785cae878bf20301bf4
2022-02-12 20:17:32.932  INFO   --- [           main] ? [eclipse-mosquitto:2.0.14]            : Container eclipse-mosquitto:2.0.14 is starting: 6eb212d9992642d7b6957eac116280b76cff36cc4ad3f785cae878bf20301bf4
2022-02-12 20:17:32.996  INFO   --- [           main] ? [eclipse-mosquitto:2.0.14]            : Container eclipse-mosquitto:2.0.14 started in PT0.449538S
2022-02-12 20:17:32.997  INFO   --- [           main] s.i.java.mqtt.MqttTestsAbstractBase      : Mosquitto broker started
2022-02-12 20:17:33.002 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: mosquitto version 2.0.14 starting
2022-02-12 20:17:33.003 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Config loaded from /mosquitto/config/mosquitto.conf.
2022-02-12 20:17:33.003 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Starting in local only mode. Connections will only be possible from clients running on this machine.
2022-02-12 20:17:33.003 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Create a configuration file which defines a listener to allow remote access.
2022-02-12 20:17:33.004 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: For more details see https://mosquitto.org/documentation/authentication-methods/
2022-02-12 20:17:33.004 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Opening ipv4 listen socket on port 1883.
2022-02-12 20:17:33.004 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Opening ipv6 listen socket on port 1883.
2022-02-12 20:17:33.005 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: Error: Address not available
2022-02-12 20:17:33.005 ERROR   --- [ream--730246518] s.i.java.mqtt.MqttTestsAbstractBase      : 1644693452: mosquitto version 2.0.14 running
2022-02-12 20:17:33.010  INFO   --- [           main] s.i.java.mqtt.MqttTestsAbstractBase      : Using MQTT broker running on host 'localhost' and port 32770
java.lang.IllegalStateException: Timeout on blocking read for 1000000000 NANOSECONDS
at se.ivankrizsan.java.mqtt.MqttTestsAbstractBase.connectPublisher(MqttTestsAbstractBase.java:200)

We can see that the MQTT broker (Mosquitto) seems to start properly in a Docker container but that the publisher client timed out when trying to connect to the broker. Also note that the Mosquitto version, version 2.0.14 in my case, is logged to the console.

Mosquitto Security and Testing – Take One

If you have worked with earlier versions of the Mosquitto MQTT broker then version 2.x will be something of a surprise. The reason is that in version 2.0 the broker was made more secure as per default – please refer to the version 2.0.0 release notes. This has the advantage that it forces us to either explicitly disable security or, preferably use more realistic security configuration in tests.

Disable Authentication

Initial research on Mosquitto broker security revealed a flag named “allow_anonymous” in the broker configuration. The first approach is to set this flag to true in order to disable client authentication altogether.

Mosquitto Configuration

The first step as far as configuring security of the Mosquitto broker is to supply configuration. In src/test/resources create a file named “mosquitto.conf” with the following contents:

# Specifies the port on which to listen for connections.
listener 1883

# Allow anonymous connections to broker, i.e. do not require login.
# Setting this flag to true will allow for anonymous access
# of the broker even with versions after 2.0.
# Version 1.6.15 is the latest version that, as per default, allows for clients to access the broker
# without having to log in. 
allow_anonymous true
# Authentication and access control settings are applied to all listeners.
per_listener_settings false

# Enable the following log levels in the Mosquitto broker.
log_type error
log_type warning
log_type notice
log_type information
log_type debug

# Enable logging when clients connects and disconnects.
connection_messages true

Note that:

  • The port on which the broker is to listen for connections is set to 1883.
  • The “allow_anonymous” flag mentioned earlier is set to true.
    This will allow clients to publish and consume messages without having to authenticate.
  • The “per_listener_settings” flag is set to false.
    Setting this flag to false will cause authentication and access control settings to be applied to all listeners. In the Mosquitto configuration listeners can be created to allow for clients being able to use different protocols, for example MQTT or MQTT+SSL, to connect to the broker. Please refer to the Listeners section in the Mosquitto documentation for more information.

Bind the Mosquitto Configuration

Having created the Mosquitto configuration, it also needs to be made available to the broker instance running in the container. In order to accomplish this the method startMqttBrokerTestContainer in the MqttTestsAbstractBase should be modified as to look like this:

    /**
     * Starts a Mosquitto MQTT broker in a container if the test is configured to
     * use a Testcontainers MQTT broker.
     * The MQTT broker will be started once prior to the execution of all tests.
     */
    @BeforeAll
    public static void startMqttBrokerTestContainer() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            sMqttBrokerContainer =
                new GenericContainer<>(MOSQUITTO_BROKER_DOCKER_IMAGE)
                    .withExposedPorts(1883, 9001)
                    .withFileSystemBind(
                        MOSQUITTO_BROKER_CONFIG,
                        MOSQUITTO_BROKER_CONFIG_CONTAINER_PATH)
                    .waitingFor(Wait.forLogMessage(".*mosquitto.*", 1));

            sMqttBrokerContainer.start();
            LOGGER.info("Mosquitto broker started");

            /*
             * Add a log consumer to the Testcontainers container as to have the logs
             * from the Mosquitto container output to the test's logger.
             */
            final Slf4jLogConsumer theMosquittoBrokerLogConsumer =
                new Slf4jLogConsumer(LOGGER).withSeparateOutputStreams();
            sMqttBrokerContainer.followOutput(theMosquittoBrokerLogConsumer);
        }
    }

Note the highlighted rows which binds the Mosquitto broker configuration file created earlier, location specified by the MOSQUITTO_BROKER_CONFIG constant, at the path in the container specified by the MOSQUITTO_BROKER_CONFIG_CONTAINER_PATH constant.

Second Run

Run the test in the ReactiveMqttClientTests class again. This time the test should pass.
Looking at the console log lines containing “***” we can see:

2022-02-16 21:19:27.559  INFO   --- [ionThreadPool-3] s.i.java.mqtt.ReactiveMqttClientTests    : *** Successfully subscribed to topic test_mqtt_topic with subscription acknowledge reason code GRANTED_QOS_2
2022-02-16 21:19:27.633  INFO   --- [ionThreadPool-5] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=1}
2022-02-16 21:19:27.634  INFO   --- [ionThreadPool-7] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=2}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-3] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=4}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-5] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=5}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-1] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=3}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-1] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=7}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-7] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=6}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-3] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=8}
2022-02-16 21:19:27.635  INFO   --- [ionThreadPool-5] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=9}
2022-02-16 21:19:27.636  INFO   --- [ionThreadPool-7] s.i.java.mqtt.ReactiveMqttClientTests    : *** Publish result received: MqttPubRec{packetIdentifier=10}
2022-02-16 21:19:27.643  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.619768'
2022-02-16 21:19:27.643  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.626279'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.626684'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.626828'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.626929'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.627057'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.627171'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.627295'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.627411'
2022-02-16 21:19:27.644  INFO   --- [ionThreadPool-4] s.i.java.mqtt.ReactiveMqttClientTests    : *** Received a message from the topic test_mqtt_topic with QoS EXACTLY_ONCE and payload 'Time: 21:19:27.627521'

Note that:

  • The subscriber successfully subscribes to the topic.
  • Ten messages are published.
    The result of each publish is MqttPubRec, as seen in the logs, which translates to PUBlish RECieved. This message is received in response to a message having been successfully published with QoS 2 (exactly once).
  • Ten messages are received.
    Each message has the QoS 2 (exactly once), which is the QoS used when publishing the messages. In addition, the payload of each message is logged.
  • Recall that Mosquitto broker security has been disabled.

Mosquitto Security and Testing – Take Two

Better than disabling Mosquitto broker security is of course to use proper broker security configuration in tests. An additional advantage is that the broker security configuration will also be tested.

Password-Based Authentication

In order to configure the Mosquitto broker to require password-based client authentication the Mosquitto configuration need to be modified and a password file has to be provided.

Mosquitto Configuration

In this second take on Mosquitto security in tests we’ll start with setting the allow_anonymous flag to false, thereby disabling anonymous client broker access. In order for registered clients to be able to log in the location of a password file will also need to be specified.

Modify the file “mosquitto.conf” located at src/test/resources to have the following contents:

# Specifies the port on which to listen for connections.
listener 1883

# Do not allow anonymous connections to broker, i.e. require login.
# Version 1.6.15 is the latest version that, as per default, allows for clients to access the broker
# without having to log in. Setting this flag to true will allow for anonymous access
# of the broker even with versions after 2.0.
allow_anonymous false
# Authentication and access control settings are applied to all listeners.
per_listener_settings false
# Location of the password file in the container. Need to be absolute.
password_file /mosquitto/config/mosquitto_passwordfile

# Enable the following log levels in the Mosquitto broker.
log_type error
log_type warning
log_type notice
log_type information
log_type debug

# Enable logging when clients connects and disconnects.
connection_messages true

Mosquitto Password File

In this example a password file that contain a usename and a hashed password will be created. This is accomplished using the mosquitto_passwd utility. For the example project either use the instructions below to create a new password file or use this pre-generated password file:

testuser:$7$101$xToRf+VNB6qUUel6$VXlvGUr0Da3o+2yMBfw6mcMSVBeBmuQIVG09ap08k2ycAmh614POTKFeAOqqxzyHVa+eVC2pR2rdSBiRffy2DA==
  • Start a Mosquitto Docker container:
docker run -it eclipse-mosquitto:latest sh
  • In the container, create a new password file containing a user named “testuser”.
    When prompted for a password, enter “secret” without quotes.
mosquitto_passwd -c mosquitto_passwordfile testuser
  • Display the contents of the newly created password file:
cat mosquitto_passwordfile
  • In the example project, create a file named “mosquitto_passwordfile” in src/test/resources.
  • Copy the contents of the password file created in the container or the pre-generated password file above to the password file in the example project.
  • Exit the container and remove it.

Bind the Mosquitto Password File

As with the Mosquitto configuration, the Mosquitto password file also need to be made available to the Mosquitto broker. Modify the startMqttBrokerTestContainer method in the MqttTestsAbstractBase to look like this:

    /**
     * Starts a Mosquitto MQTT broker in a container if the test is configured to
     * use a Testcontainers MQTT broker.
     * The MQTT broker will be started once prior to the execution of all tests.
     */
    @BeforeAll
    public static void startMqttBrokerTestContainer() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            sMqttBrokerContainer =
                new GenericContainer<>(MOSQUITTO_BROKER_DOCKER_IMAGE)
                    .withExposedPorts(1883, 9001)
                    .withFileSystemBind(
                        MOSQUITTO_BROKER_CONFIG,
                        MOSQUITTO_BROKER_CONFIG_CONTAINER_PATH)
                    .withFileSystemBind(
                        MOSQUITTO_BROKER_PASSWORDFILE,
                        MOSQUITTO_BROKER_PASSWORDFILE_CONTAINER_PATH)
                    .waitingFor(Wait.forLogMessage(".*mosquitto.*", 1));

            sMqttBrokerContainer.start();
            LOGGER.info("Mosquitto broker started");

            /*
             * Add a log consumer to the Testcontainers container as to have the logs
             * from the Mosquitto container output to the test's logger.
             */
            final Slf4jLogConsumer theMosquittoBrokerLogConsumer =
                new Slf4jLogConsumer(LOGGER).withSeparateOutputStreams();
            sMqttBrokerContainer.followOutput(theMosquittoBrokerLogConsumer);
        }
    }

Note the additional call to withFileSystemBind taking the location of the password file as defined by the constant MOSQUITTO_BROKER_PASSWORDFILE and the desired location of the file in containers as specified by the constant MOSQUITTO_BROKER_CONFIG_CONTAINER_PATH.

Add Client Authentication

Clients, both publisher and subscriber clients, are now required to authenticate when connecting to the Mosquitto broker. This is accomplished by modifying the setUpBeforeTest method in the MqttTestsAbstractBase class to look like this:

    /**
     * Performs set-up before each test by creating two MQTT clients that will connect
     * to the test MQTT broker.
     */
    @BeforeEach
    public void setUpBeforeTest() {
        if (RUN_MQTT_BROKER_IN_TESTCONTAINER) {
            mMqttBrokerHost = sMqttBrokerContainer.getHost();
            mMqttBrokerPort = sMqttBrokerContainer.getFirstMappedPort();
        } else {
            /* If using an externally started MQTT broker, assume that it is running on localhost:1883. */
            mMqttBrokerHost = EXTERNAL_MQTT_BROKER_HOST;
            mMqttBrokerPort = EXTERNAL_MQTT_BROKER_PORT;
        }
        LOGGER.info("Using MQTT broker running on host '{}' and port {}",
            mMqttBrokerHost,
            mMqttBrokerPort);

        /*
         * Create authentication object holding username and password used
         * by publisher and subscriber.
         */
        final Mqtt5SimpleAuth theMqttAuth = Mqtt5SimpleAuth.builder()
            .username(MOSQUITTO_USERNAME)
            .password(MOSQUITTO_PASSWORD.getBytes(StandardCharsets.UTF_8))
            .build();

        /* Create an MQTT client used for publishing messages. */
        mPublisherId = UUID.randomUUID().toString();
        final Mqtt5Client theRxMqttPublisherClient = MqttClient.builder()
            .useMqttVersion5()
            .identifier(mPublisherId)
            .serverHost(mMqttBrokerHost)
            .serverPort(mMqttBrokerPort)
            .simpleAuth(theMqttAuth)
            .automaticReconnect()
            .maxDelay(MAX_RECONNECT_DELAY_MILLISEC, TimeUnit.MILLISECONDS)
            .applyAutomaticReconnect()
            .buildRx();
        mMqttPublisher = Mqtt5ReactorClient.from(theRxMqttPublisherClient);

        /* Create an MQTT client used for subscribing. */
        mSubscriberId = UUID.randomUUID().toString();
        final Mqtt5Client theRxMqttSubscriberClient = MqttClient.builder()
            .useMqttVersion5()
            .identifier(mSubscriberId)
            .serverHost(mMqttBrokerHost)
            .serverPort(mMqttBrokerPort)
            .simpleAuth(theMqttAuth)
            .automaticReconnect()
            .maxDelay(MAX_RECONNECT_DELAY_MILLISEC, TimeUnit.MILLISECONDS)
            .applyAutomaticReconnect()
            .buildRx();
        mMqttSubscriber = Mqtt5ReactorClient.from(theRxMqttSubscriberClient);
    }

Note that:

  • An object of the type Mqtt5SimpleAuth is created prior to creating the clients.
    This object holds the login information, a user-name and a password, that will later be used by the clients when connecting to the broker.
  • Both clients invoke the simpleAuth method, as part of constructing the clients, providing the Mqtt5SimpleAuth object created earlier.

Third Run

Run the test in the ReactiveMqttClientTests class yet again and examine the console log. The highlighted rows in the log show that the publishing client and the consuming client both have connected to the Mosquitto broker using the “testuser” user:

2022-02-20 11:23:50.828 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352630: mosquitto version 2.0.14 running
2022-02-20 11:23:50.832  INFO   --- [           main] s.i.java.mqtt.MqttTestsAbstractBase      : Using MQTT broker running on host 'localhost' and port 32770
2022-02-20 11:23:51.407 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: New connection from 172.17.0.1:42206 on port 1883.
2022-02-20 11:23:51.427 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: New client connected from 172.17.0.1:42206 as d19e186a-15c5-4972-90a8-a1fb631e2319 (p5, c1, k60, u'testuser').
2022-02-20 11:23:51.427 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: No will message specified.
2022-02-20 11:23:51.427 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: Sending CONNACK to d19e186a-15c5-4972-90a8-a1fb631e2319 (0, 0)
2022-02-20 11:23:51.447  INFO   --- [           main] s.i.java.mqtt.MqttTestsAbstractBase      : Connecting publisher received ACK code: SUCCESS
2022-02-20 11:23:51.451 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: New connection from 172.17.0.1:42210 on port 1883.
2022-02-20 11:23:51.460 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: New client connected from 172.17.0.1:42210 as 267039c9-7abc-43c9-a385-14abf516727c (p5, c1, k60, u'testuser').
2022-02-20 11:23:51.461 ERROR   --- [eam--1915040269] s.i.java.mqtt.MqttTestsAbstractBase      : 1645352631: No will message specified.
2022-02-20 11:23:51.461  INFO   --- [           main] s.i.java.mqtt.MqttTestsAbstractBase      : Connecting subscriber received ACK code: SUCCESS

Password-based authentication has successfully been added to the example project. This concludes this article.

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *