qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

Java Real-time Message Push (Part Two)

When I wrote about real-time push (Part 1) before, I used the MQTT plugin for RabbitMQ. At that time, it met my needs, but now there is a new requirement for remote wake-up, which involves determining whether a device is online. If the device is online at the current time, it indicates that it can be remotely awakened. Initially, I wanted to see if there was an API available in RabbitMQ to obtain the online devices using MQTT. After all, the message queue is also using this, so there is no need to add more middleware. Then I discovered that MQTT has many conventional system topics that maintain the state of the MQTT broker. However, as I mentioned, system topics are conventional configurations and not mandatory, and the MQTT implementation in RabbitMQ is merely a plugin supplement. This is what I raised in the official repository's discussions.

There was no choice; it seems I can only introduce a professional MQTT middleware. Here, I chose EMQX mainly because the documentation is indeed detailed.

Installation#

Here, I chose to install using Docker on WSL2, and the installation command is very simple.

$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0

After successful installation, you can directly access the control panel at http://localhost:18083. The default username is admin, and the password is public.

image

Configuration#

After installation, configuration is also needed in the program. Since the version has switched to MQTT V5, the dependencies and configurations also need to be changed.

Dependencies#

 <!--MQTT dependency package-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

Configuration File#

# MQTT protocol configuration
spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    username: admin
    password: admin
    clientId: serverClientId
    # Published topic -- MQTT default message push topic, can be specified during API call
    pubTopic: testTopic
    # Subscribed topics
    subTopic: testTopic,remote-wake
    completionTimeout: 30000

Since EMQX allows anonymous connections by default, the username and password can be omitted.

Producer#

Next, create a configuration class for the producer.

/**
 * @author lza
 * @date 2023/11/24-10:24
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {

    private final MqttProperties mqttProperties;

    /**
     * Client object
     */
    private MqttClient providerClient;

    /**
     * Connect to the server after bean initialization
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * Client connects to the server
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        // Create MQTT client object
        providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
        // Connection settings
        MqttConnectionOptions options = new MqttConnectionOptions();
        // Whether to clear session, set to false means the server will retain the client's connection record (subscribed topics, QoS), allowing the client to retrieve messages pushed by the server during disconnection after reconnecting
        // Set to true means each connection to the server is treated as a new identity
        options.setCleanStart(true);
        // Set connection username
        options.setUserName(mqttProperties.getUsername());
        // Set connection password
        options.setPassword(mqttProperties.getPassword().getBytes());
        // Set timeout, in seconds
        options.setConnectionTimeout(100);
        // Set heartbeat interval in seconds, indicating that the server sends a heartbeat to the client every 1.5 * 20 seconds to check if the client is online
        options.setKeepAliveInterval(20);
        // Set automatic reconnection
        options.setAutomaticReconnect(true);

        // Set callback
        providerClient.setCallback(new MqttProviderCallBack());
        providerClient.connect(options);
    }

    /**
     * Publish message
     *
     * @param qos      Quality of Service level
     *                 0 sends only once, regardless of success or failure
     *                 1 will continue sending until successful, may receive multiple times
     *                 2 will continue sending but guarantees to receive only once
     * @param retained Retain flag
     *                 If set to true, the server must store this application message and its QoS level, and when a subscriber subscribes to this topic, the message will be pushed to the subscriber
     *                 But the server will only retain one retained message for the same topic (the last one received)
     * @param topic    Topic
     * @param message  Message
     * @author xct
     * @date 2021/7/30 16:27
     */
    @SneakyThrows
    public void publish(int qos, boolean retained, String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        // Destination topic for publishing/subscribing messages
        MqttTopic mqttTopic = providerClient.getTopic(topic);
        // Provides a mechanism to track the delivery progress of messages.
        // Used to track the delivery progress of messages when publishing in a non-blocking manner (running in the background)
        MqttToken token;
        // Publish the specified message to the topic without waiting for message delivery to complete. The returned token can be used to track the delivery status of the message.
        // Once this method returns cleanly, the message has been accepted for publication by the client. Message delivery will be completed in the background when the connection is available.
        token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }
}

The main task is to create the MQTT client object based on the connection parameters in the configuration file, and then create a publish method to push messages. Since the MqttProviderCallBack is specified as the callback function when creating the producer client object, this callback class also needs to be created.

Producer Callback#

The producer's callback methods are implemented as needed.

/**
 * @author lza
 * @date 2023/11/24-10:34
 **/

public class MqttProviderCallBack implements MqttCallback {

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("Producer: Disconnected from the server");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {
        MqttClientInterface client = iMqttToken.getClient();
        System.out.println(client.getClientId() + " message published successfully!");
    }

    @Override
    public void connectComplete(boolean b, String s) {
    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

Consumer#

After creating the producer, a consumer is also needed.

/**
 * @author lza
 * @date 2023/11/24-10:43
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {

    private final MqttProperties mqttProperties;

    /**
     * Client object
     */
    public MqttClient consumerClient;

    /**
     * Connect to the server after bean initialization
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * Client connects to the server
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        // Create MQTT client object
        consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
        // Connection settings
        MqttConnectionOptions options = new MqttConnectionOptions();
        // Whether to clear session, set to false means the server will retain the client's connection record (subscribed topics, QoS), allowing the client to retrieve messages pushed by the server during disconnection after reconnecting
        // Set to true means each connection to the server is treated as a new identity
        options.setCleanStart(true);
        // Set connection username
        options.setUserName(mqttProperties.getUsername());
        // Set connection password
        options.setPassword(mqttProperties.getPassword().getBytes());
        // Set timeout, in seconds
        options.setConnectionTimeout(100);
        // Set heartbeat interval in seconds, indicating that the server sends a heartbeat to the client every 1.5 * 20 seconds to check if the client is online
        options.setKeepAliveInterval(20);
        // Set automatic reconnection
        options.setAutomaticReconnect(true);

        // Set callback
        consumerClient.setCallback(new MqttConsumerCallBack(this));
        consumerClient.connect(options);

        // Subscribe to topics
        // Message levels correspond to the topic array, the server will push messages to the subscribed clients according to the specified level
        int[] qos = {1,1};
        // Topics
        String[] topics = mqttProperties.getSubTopic().split(",");
        // Subscribe to topics
        consumerClient.subscribe(topics,qos);
    }

    /**
     * Disconnect
     *
     * @author xct
     * @date 2021/8/2 09:30
     */
    @SneakyThrows
    public void disConnect() {
        consumerClient.disconnect();
    }

    /**
     * Subscribe to a topic
     *
     * @param topic Topic
     * @param qos   Message level
     * @author xct
     * @date 2021/7/30 17:12
     */
    @SneakyThrows
    public void subscribe(String topic, int qos) {
        consumerClient.subscribe(topic, qos);
    }
}

The consumer also needs to create a consumer client instance using the connection information and specify the consumer's callback function. The difference is that the consumer has the subscribe method and the disConnect method for disconnecting.

Consumer Callback#

/**
 * @author lza
 * @date 2023/11/24-10:55
 **/
public class MqttConsumerCallBack implements MqttCallback {

    private final MqttConsumerConfig consumerConfig;

    public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("Consumer: Disconnected from the server");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.printf("Received message topic: %s%n",s);
        System.out.printf("Received message QoS: %d%n",mqttMessage.getQos());
        System.out.printf("Received message content: %s%n",new String(mqttMessage.getPayload()));
        System.out.printf("Received message retained: %b%n",mqttMessage.isRetained());

        // Set MQTT V5 request-response mode
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("Device not connected".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

The most important method in the consumer's callback is messageArrived, which is triggered when the consumer receives a message on the subscribed topic.

If everything is correct, you can see both clients, the producer and the consumer, online in the EMQX backend management page.

image

Request-Response#

You may have noticed the request-response comment in the consumer's messageArrived callback method, which is also why the MQTT version dependency was switched from 3.1.1 to 5.

The request-response mode is a major feature of MQTT V5. You can check out this document for an introduction, and this document with interesting examples here.

In simple terms, we all know that HTTP requests are a clear request/response model. The frontend requests the backend through an interface, and the backend processes the data and returns the result to the frontend. Regardless of success or failure, the frontend can always obtain a return value. In the MQTT push-subscribe model, due to the indifferent backgrounds of producers and consumers (since MQTT is usually used in IoT scenarios, where most scenarios are overly tolerant of producers or consumers), imagine that you subscribe to a public account; you don't have to care about when this account pushes messages to you; you just need to ensure that you don't miss any messages pushed by the account.

For example, if you have a temperature sensor placed in a room that is connected to the internet, and you want to get the current temperature of the sensor through a mobile app while you are outside.

image

It resembles an HTTP request, but the backend data has turned into data collected by the sensor. If you don't use request-response, you can still do it by creating two topics A and B. When requesting, send the message to A, and then the device subscribes to A. After collecting the data, it sends the data to topic B for the app to subscribe.

So how is the request-response done? When sending a message to A, directly specify the response topic. Once the device receives the message and sees the bold text saying, "If you have something, find me at B," it directly returns the data to topic B. It is somewhat like the process of mailing a letter at the post office.

In MQTTX, you can select version 5 when connecting to MQTT to conduct request-response testing.

image

User properties are added for judgment, corresponding to the consumer's callback method.

// Set MQTT V5 request-response mode
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("Device not connected".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }

If the user property action has the value remoteWake, it triggers the response mode, retrieves the response topic from the message, and automatically replies.

Online and Offline Notifications#

Request-response is just an extension; the initial requirement was to check whether the device is online. Since the MQTT middleware has been changed, can we use system topics? The answer is yes, but it is unnecessary because EMQX provides a more elegant way.

The simplest method is to directly use the enterprise version of EMQX.

The main use cases for data storage include recording client online and offline status, subscription topic information, message content, message delivery receipts after messages arrive, etc., into various databases such as Redis, MySQL, PostgreSQL, MongoDB, Cassandra, etc. Users can also achieve similar functionality by subscribing to related topics, but the enterprise version has built-in support for these persistences; compared to the former, the latter has higher execution efficiency and can greatly reduce the workload of developers.

The second method, which is also recommended, is to use webhooks to maintain online and offline notifications.

First, create an interface to receive webhook notifications.

    private final static String CLIENT_IDS = "clientIds";
    private final static String CONNECTED = "client_connected";
    private final static String DISCONNECTED = "client_disconnected";
    
/**
     * EMQX webhook hook to listen for client online and offline
     * @param vo Online or offline VO
     */
    @SaIgnore
    @PostMapping("/onlineOrOffline")
    public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
        System.err.println("Client: " + vo.getClientid() +
            ", Action: " + vo.getAction() +
            ", Reason: " + vo.getReason());
        List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
        if (vo.getAction().equals(CONNECTED)) {
            list.add(vo.getClientid());
            // First delete the original value
            RedisUtils.deleteKeys(CLIENT_IDS);
            // Remove duplicates
            ArrayList<Object> distinct = CollUtil.distinct(list);
            RedisUtils.setCacheList(CLIENT_IDS, distinct);
        } else if (vo.getAction().equals(DISCONNECTED)){
            list.remove(vo.getClientid());
            // First delete the original value
            RedisUtils.deleteKeys(CLIENT_IDS);
            RedisUtils.setCacheList(CLIENT_IDS, list);
        }
    }

The reason for removing duplicates is that if client A comes online, and at that time, Redis contains A, then the service goes down and restarts, and A comes online again, Redis will then have two A's.

Then, enable the webhook plugin in the EMQX plugins.

image

Next, enter the Docker container to change the plugin configuration.

docker exec -it emqx bash
cd /etc/plugins

image

Modify the plugin.

vi emqx_web_hook.conf

The main modification is at the beginning of the file, changing the Webhook URL to the interface address.

image

Next, modify the notification rules below.

image

Simply uncomment the following two rules for online and offline notifications.

web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

Once completed, you will be able to see notifications for client online and offline.

image

However, this is not perfect. If the MQTT middleware goes offline, then the webhook and Redis storing the online devices will be useless.

Don't worry, EMQX provides an HTTP API to check whether a specific device is online.

In fact, the API also provides an interface to get information about all clients in the cluster, but it is paginated, and it becomes cumbersome when there are a large number of clients. Therefore, it is better to use the webhook to maintain online and offline notifications and then use the interface to check whether a specific client is online, which can meet the vast majority of needs.

/**
     * Use EMQX API to check if the client is online
     * @param clientId Client ID
     * @return Whether online
     */
    @SaIgnore
    @GetMapping("/checkClientStatus/{clientId}")
    public R<Boolean> checkClientStatus(@PathVariable String clientId) {
        // Send GET request
        String url = "http://localhost:18083/api/v4/clients/" + clientId;
        HttpRequest request = HttpUtil.createGet(url);
        request.basicAuth("admin", "public");

        HttpResponse response = request.execute();

        ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
        return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
    }

EMQX Documentation
EMQX Installation and Usage and Some Pitfalls
How to Listen for Client Online and Offline in MQTT EMQX and Use it Normally in Business

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.