qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github
email
telegram

javaリアルタイムメッセージプッシュ(一)

前言#

最近、ユーザーがアップロードしたデータが異常か正常かを判断する必要があるビジネス要件があり、異常な場合にはリアルタイムでユーザーに警報メッセージをプッシュする必要があります。
ほとんどの場合、通常はクライアント(ブラウザ)がサーバー(サーバー)にリクエストを送信し、必要なデータを知らせます。
しかし、一部の状況では、サーバーがクライアントにメッセージをプッシュする必要があります。たとえば、上記の状況や、ウェブページで一般的なQR コードログインなどです。

技術選択#

サーバーが積極的にプッシュする方法はいくつかあり、ビジネスに応じて選択する必要があります。
シンプルで一般的な方法は、長いポーリングと短いポーリングです。名前からしてシンプルで粗暴です。
さらに進んだ方法として、SSE や WebSocket があります。
さらに深いレベルでは、さまざまなメッセージキュー(MQTT など)があります。
他にも SMS プッシュやアプリ通知などがあります。たとえば、極光 Push などです。

実現方式実現原理モード
短輪詢クライアント特定の時間間隔(例:1 秒ごと)で、クライアントがサーバーにリクエストを送信し、サーバーが最新のデータをクライアントのブラウザに返します。/
長輪詢サーバークライアントがサーバーにリクエストを送信し、サーバーがリクエストを受け取った後接続を保持し、新しいメッセージがあるまで応答情報を返さず、クライアントが応答情報を処理した後に新しいリクエストをサーバーに送信します。/
SSE(Server-sent Events)サーバークライアントが最初にサーバーに長接続を登録し、サーバーがイベントを取得した後、登録されたクライアントにメッセージを送信できます。サーバーからのみプッシュ
WebSocketクライアント / サーバーWebSocket を使用するには新しい依存関係を導入する必要があり、一度クライアントとサーバーのハンドシェイクが成功すると、同じチャンネルにいてリアルタイムでメッセージを送受信できます。フルデュプレックス、サーバーとクライアントの両方がプッシュと受信が可能
MQTTクライアント / サーバーMQTT は IoT ビジネスで一般的に使用される、パブリッシュ / サブスクライブモデルに基づく軽量通信プロトコルで、TCP/IP プロトコルの上に構築されています。MQTT の最大の利点は、非常に少ないコードと限られた帯域幅で、リモートデバイスにリアルタイムで信頼性のあるメッセージサービスを提供できることです。パブリッシュ / サブスクライブモデル、サーバーとクライアントの両方がプッシュと受信が可能

長 / 短ポーリングについては繰り返しませんが、今回はSSEMQTTの方法について主に説明します。

SSE#

Spring Boot では SSE がネイティブでサポートされており、他の依存関係をインポートする必要がないのが利点ですが、欠点も明らかで、サーバーからの一方向プッシュのみをサポートし、高度なブラウザ(Chrome、Firefox など)のみをサポートします。ブラウザの制限により、各ウェブページは最大 6 つの長接続を保持することができます。接続が多すぎると、より多くのメモリと計算リソースを消費する可能性があります。
まず、SSE ユーティリティクラスを作成します。

@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * 現在の接続数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * SseEmitter情報を保存
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * ユーザー接続を作成し、SseEmitterを返します
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }

        try {
            // タイムアウト時間を設定、0は期限なしを意味します。デフォルトは30秒
            SseEmitter sseEmitter = new SseEmitter(0L);
            // コールバックを登録
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // 数量+1
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("新しいSSE接続の作成に失敗しました。現在の接続キーは:{}", key);
        }
        return null;
    }

    /**
     * 指定されたユーザーにメッセージを送信
     * @param key userId
     * @param message メッセージ内容
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("ユーザー[{}]のプッシュに失敗しました:{}", key, e.getMessage());
                remove(key);
            }
        }
    }

    /**
     * 同じグループの人にメッセージを発信します。要件:key + groupId
     * @param groupId グループID
     * @param message メッセージ内容
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }

    /**
     * ブロードキャストメッセージを送信
     * @param message メッセージ内容
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("ユーザー[{}]のプッシュに失敗しました:{}", k, e.getMessage());
                remove(k);
            }
        });
    }

    /**
     * メッセージを一斉送信
     * @param message メッセージ内容
     * @param ids ユーザーIDの集合
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 接続を削除
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // 数量-1
        count.getAndDecrement();
        log.info("接続を削除:{}", key);
    }

    /**
     * 現在の接続情報を取得
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 現在の接続数を取得
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("接続終了:{}", key);
            remove(key);
        };
    }

    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("接続タイムアウト:{}", key);
            remove(key);
        };
    }

    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("接続異常:{}", key);
            remove(key);
        };
    }
}

次に、ユーティリティクラスを使用していくつかのインターフェースを実装し、クライアントが購読し、サーバーが積極的にメッセージをプッシュできるようにします。

@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {

    /**
     * 接続を作成
     * @param id ユーザーID
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }


    /**
     * 指定されたユーザーにメッセージをプッシュ
     * @param id ユーザーID
     * @param content プッシュ内容
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }


    /**
     * 指定されたグループにメッセージをプッシュ
     * @param groupId グループID
     * @param content プッシュ内容
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }


    /**
     * ブロードキャストメッセージ
     * @param content プッシュ内容
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }

    /**
     * 接続を閉じる
     * @param id ユーザーID
     * @param request リクエスト
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }

}

最後に、以下の HTML ページを使用してテストできます。

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {

            let sources = [];
            // 接続を作成  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }

            /** 
             * 接続が確立されると、openイベントがトリガーされます 
             * 別の書き方:source.onopen = function (event) {} 
             */
            // 接続オープンイベント
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + "接続オープン")
                    console.log(id + "接続オープン");
                });
            });

            /** 
             * クライアントがサーバーから送信されたデータを受信 
             * 別の書き方:source.onmessage = function (event) {} 
             */
            // メッセージイベント 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + "メッセージを受信:" + e.data)
                    console.log(id + "メッセージを受信:" + e.data);
                });
            });

            /** 
             * 通信エラーが発生した場合(接続が中断された場合など)、errorイベントがトリガーされます 
             * 別の書き方:source.onerror = function (event) {} 
             */
            // エラーハンドリング
            sources.forEach(source => {

                let id = source.url.split('/').pop();

                source.addEventListener('error', function (e) {

                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + "接続閉じる")
                        console.log(id + "接続閉じる");
                    } else {
                        setMessageInnerHTML(id + "接続エラー:", e)
                        console.log(id + "接続エラー:", e);
                    }

                });

            });
        } else {
            setMessageInnerHTML("ブラウザはSSEをサポートしていません");
        }

        // ウィンドウ閉じるイベントをリッスンし、sse接続を手動で閉じる。サーバーが期限なしに設定されている場合、ブラウザが閉じられた後にサーバーデータを手動でクリーンアップします。 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };

        // メッセージをウェブページに表示 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>

<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>

</html>

次に、インターフェースを呼び出してテストします。

image

ブラウザは最終的に 6 つの接続のみを保持し、他の接続はすべて破棄されたことがわかります。

MQTT#

MQTT を実現するには、サーバーに以下の依存関係を追加する必要があります。

<!--mqtt依存関係パッケージ-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

次に、ミドルウェアをインストールする必要があります。たとえば、EMQXや、直接 RabbitMq を使用することもできます。RabbitMq は QOS2 レベルのメッセージグレードをサポートしていないことに注意してください。

image

MQTT の QoS メッセージ品質レベルについては、以下の表の説明を参照してください。

QoS レベル説明適用シーン
0最多 1 回の配信、メッセージは確認されず再送信されない重要でないデータ転送に適している、センサーのデータなど
1最低 1 回の配信、メッセージが少なくとも 1 回配信されることを保証しますが、重複する可能性がありますメッセージ配信を保証する必要があるが、重複配信を許可するシーンに適しています
21 回のみの配信、メッセージが 1 回だけ配信されることを保証し、重複配信を許可しませんメッセージの正確な配信を保証する必要があり、重複配信を許可しないシーンに適しています

以下は、RabbitMQ ミドルウェアを使用する例です。
MQTT プロトコル情報を構成します。

server:
  port: 8008

spring:
  application:
    name: mqttテストプロジェクト
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      #発行するテーマ--MQTT-デフォルトのメッセージプッシュテーマ、実際には呼び出しインターフェースで指定できます
    pubTopic: testTopic
      #購読するテーマ
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000

次に、構成ファイルのエンティティクラスマッピングを作成します。

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

    /**
     * RabbitMQ接続ユーザー名
     */
    private String username;
    /**
     * RabbitMQ接続パスワード
     */
    private String password;
    /**
     * プッシュテーマ
     */
    private String pubTopic;
    /**
     * RabbitMQのMQTT接続アドレス
     */
    private String url;

    /**
     * RabbitMQのMQTT接続クライアントID
     */
    private String clientId;

    /**
     * 購読テーマ
     */
    private String subTopic;

    /**
     * タイムアウト時間
     */
    private Integer completionTimeout;
}

次に、コンシューマーを作成します。

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {

    private final MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        //メッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("ヘッダー: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("購読テーマ: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("購読テーマ:{};受信したテーマメッセージ:{}",topic, message.getPayload());
                    }
                }
            }

        };
    }
}

次に、プロデューサーを作成します。

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {

    private final MqttProperties mqttProperties;

    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // セッションをクリアするかどうかを設定します。ここでfalseに設定すると、サーバーはクライアントの接続記録を保持します。
        // cleanSessionをfalseに設定すると、クライアントがオフラインになった後、サーバーはセッションをクリアしません。
        // 再接続後、以前に購読したテーマのメッセージを受信できます。クライアントがオンラインになると、オフラインの間のメッセージを受信します。
        options.setCleanSession(false);
        // タイムアウト時間を設定します。単位は秒です。
        options.setConnectionTimeout(10);
        // セッションのハートビート時間を設定します。単位は秒です。サーバーは1.5*20秒ごとにクライアントにメッセージを送信し、クライアントがオンラインかどうかを判断しますが、この方法には再接続のメカニズムはありません。
        options.setKeepAliveInterval(20);
        // 切断後に再接続しますが、この方法には再購読のメカニズムはありません。
        // 再接続を試みる前に、最初に1秒待機し、各失敗した再接続試行の遅延は倍増し、2分に達するまで続きます。この時、遅延は2分のままになります。
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // プッシュ時のメッセージ品質を設定:0->最多1回;1->最低1回;2->1回のみ
        // RabbitMQではQOS2はQOS1にダウングレードされます。
        messageHandler.setDefaultQos(1);
        // メッセージを保持するかどうかを設定します。trueに設定すると、保持されたメッセージは再接続時に送信されます。
        // 新しい空の保持メッセージを送信することでのみクリアできます。
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

次に、メッセージを送信するためのゲートウェイインターフェースを作成します。

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * デフォルトのトピックにメッセージを送信
     */
    void sendToMqtt(String payload);

    /**
     * 指定されたトピックにメッセージを送信
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 指定されたトピックにメッセージを送信し、QOSを設定
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

最後に、メッセージを送信するためのインターフェースを作成します。

@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }

    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}

サードパーティの MQTT ソフトウェアを使用してテストできます。たとえば、mqttxを使用します。
インストールが完了したら、ソフトウェアを開いて接続を作成します。

image

注意が必要な点は以下の通りです。

  1. 各接続の ClientId は異なり、一意である必要があります。同じ ClientId は接続が競合し、メッセージが失われる可能性があります。
  2. サーバー接続プロトコルは TCP でも MQTT でもかまいません。
  3. RabbitMQ の場合、ユーザーの権限を設定する必要があります。
  4. RabbitMQ の場合、バージョンは 3.1.1 を選択してください。
  5. 消費者がオフラインの後もプロデューサーのメッセージを受信したい場合は、Clean Session をオフにしてください。

接続が成功した後、インターフェースを呼び出してメッセージを送信すると、購読者はメッセージを受信できます。

image

Spring Boot と MQTT を統合してメッセージの送信と消費を実現し、クライアントが切断された後のメッセージの復元

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。