以前リアルタイムプッシュ(一)を書いたときは、rabbitmq の mqtt プラグインを使用していました。その時は私のニーズを満たしていましたが、今は新しいニーズが出てきました。リモートウェイクアップを判断するために、デバイスがオンラインかどうかを確認し、現在の時間にデバイスがオンラインであれば、リモートウェイクアップが可能であることを示します。
最初は、rabbitmq の基盤の上で、mqtt のオンラインデバイスを取得できる API があるかどうか探そうと思いました。結局、メッセージキューもこれを使用しているので、ミドルウェアを追加する必要はありませんでした。そして、mqtt には多くの規約のシステムトピックがあり、これらのシステムトピックは mqtt ブローカーの状態を維持しています。
しかし、私が言ったように、システムトピックは規約の設定であり、強制的なものではなく、rabbitmq の mqtt 実装もプラグインとしての補足に過ぎません。これは私が公式リポジトリで提起したディスカッションです。
仕方がないので、専門の mqtt ミドルウェアを導入するしかないようです。ここではEMQXを選びました。主にドキュメントが非常に詳細だからです。
インストール#
ここでは WSL2 の Docker を使用してインストールすることにしました。インストールコマンドは非常に簡単です。
$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0
インストールが成功したら、直接 http://localhost:18083
にアクセスしてコントロールパネルを使用できます。デフォルトのユーザー名は admin
、パスワードは public
です。
設定#
インストールが完了したら、プログラム内で設定を行う必要があります。mqtt V5 にバージョンを切り替えたため、依存関係と設定も変更する必要があります。
依存関係#
<!--mqtt依存パッケージ-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
設定ファイル#
# mqttプロトコル設定
spring:
mqtt:
url: tcp://127.0.0.1:1883
username: admin
password: admin
clientId: serverClientId
#発行するトピック--MQTT-デフォルトのメッセージプッシュトピック、実際には呼び出しインターフェース時に指定可能
pubTopic: testTopic
#購読するトピック
subTopic: testTopic,remote-wake
completionTimeout: 30000
emqx はデフォルトで匿名接続を許可しているため、ユーザー名とパスワードは省略できます。
プロデューサー#
次に、プロデューサーの設定クラスを作成します。
/**
* @author lza
* @date 2023/11/24-10:24
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {
private final MqttProperties mqttProperties;
/**
* クライアントオブジェクト
*/
private MqttClient providerClient;
/**
* bean初期化後にサーバーに接続
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* クライアントがサーバーに接続
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//MQTTクライアントオブジェクトを作成
providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
//接続設定
MqttConnectionOptions options = new MqttConnectionOptions();
//セッションをクリアするかどうか、falseに設定するとサーバーはクライアントの接続記録(購読トピック、qos)を保持し、クライアントが再接続した後にサーバーがクライアントの切断中にプッシュしたメッセージを取得できる
//trueに設定すると、毎回サーバーに接続する際に新しいアイデンティティとして接続される
options.setCleanStart(true);
//接続ユーザー名を設定
options.setUserName(mqttProperties.getUsername());
//接続パスワードを設定
options.setPassword(mqttProperties.getPassword().getBytes());
//タイムアウト時間を設定、単位は秒
options.setConnectionTimeout(100);
//ハートビート時間を設定、単位は秒、サーバーが1.5*20秒ごとにクライアントにハートビートを送信してクライアントがオンラインかどうかを判断する
options.setKeepAliveInterval(20);
//自動再接続を設定
options.setAutomaticReconnect(true);
//コールバックを設定
providerClient.setCallback(new MqttProviderCallBack());
providerClient.connect(options);
}
/**
* メッセージを発行
*
* @param qos サービス品質レベル
* 0は一度だけ送信し、成功かどうかは問わない
* 1は未成功の場合、成功するまで送信し続け、複数回受信する可能性がある
* 2は未成功の場合、送信し続けるが、一度だけ受信することを保証する
* @param retained 保持フラグ
* trueに設定すると、サーバーはこのアプリケーションメッセージとそのサービス品質レベルを保存し、購読者がこのトピックを購読すると、メッセージをその購読者にプッシュする
* ただし、サーバーは同じトピックに対して1つのretainedメッセージ(最後に受信したもの)しか保持しない
* @param topic トピック
* @param message メッセージ
* @author xct
* @date 2021/7/30 16:27
*/
@SneakyThrows
public void publish(int qos, boolean retained, String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//トピックの宛先、メッセージを発行/購読するために使用
MqttTopic mqttTopic = providerClient.getTopic(topic);
//メッセージの配信進捗を追跡するメカニズムを提供
//非ブロッキング方式(バックグラウンドで実行)で発行を実行する際にメッセージの配信進捗を追跡するために使用
MqttToken token;
//指定されたメッセージをトピックに発行するが、メッセージの配信が完了するのを待たない。返されたトークンはメッセージの配信状態を追跡するために使用できる。
//このメソッドが正常に戻ると、メッセージはクライアントによって受け入れられた発行として扱われる。接続が利用可能になると、メッセージの配信がバックグラウンドで完了する。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
主に行うことは、設定ファイルの接続パラメータに基づいて mqtt クライアントオブジェクトを作成し、publish
メソッドを作成してメッセージをプッシュすることです。
プロデューサークライアントオブジェクトを作成する際に MqttProviderCallBack
をコールバック関数として指定したため、このコールバッククラスも作成する必要があります。
プロデューサーコールバック#
プロデューサーのコールバックメソッドは必要に応じて実装します。
/**
* @author lza
* @date 2023/11/24-10:34
**/
public class MqttProviderCallBack implements MqttCallback {
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("プロデューサー:サーバーとの接続が切断されました");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
MqttClientInterface client = iMqttToken.getClient();
System.out.println(client.getClientId() + "メッセージの発行に成功しました!");
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
コンシューマー#
プロデューサーを作成した後、コンシューマーも必要です。
/**
* @author lza
* @date 2023/11/24-10:43
**/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {
private final MqttProperties mqttProperties;
/**
* クライアントオブジェクト
*/
public MqttClient consumerClient;
/**
* bean初期化後にサーバーに接続
* @author xct
* @date 2021/7/30 16:48
*/
@PostConstruct
public void init(){
connect();
}
/**
* クライアントがサーバーに接続
* @author xct
* @date 2021/7/30 16:01
*/
@SneakyThrows
public void connect(){
//MQTTクライアントオブジェクトを作成
consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
//接続設定
MqttConnectionOptions options = new MqttConnectionOptions();
//セッションをクリアするかどうか、falseに設定するとサーバーはクライアントの接続記録(購読トピック、qos)を保持し、クライアントが再接続した後にサーバーがクライアントの切断中にプッシュしたメッセージを取得できる
//trueに設定すると、毎回サーバーに接続する際に新しいアイデンティティとして接続される
options.setCleanStart(true);
//接続ユーザー名を設定
options.setUserName(mqttProperties.getUsername());
//接続パスワードを設定
options.setPassword(mqttProperties.getPassword().getBytes());
//タイムアウト時間を設定、単位は秒
options.setConnectionTimeout(100);
//ハートビート時間を設定、単位は秒、サーバーが1.5*20秒ごとにクライアントにハートビートを送信してクライアントがオンラインかどうかを判断する
options.setKeepAliveInterval(20);
//自動再接続を設定
options.setAutomaticReconnect(true);
//コールバックを設定
consumerClient.setCallback(new MqttConsumerCallBack(this));
consumerClient.connect(options);
//トピックを購読
//メッセージレベル、トピック配列と一対一で対応し、サーバーは指定されたレベルで購読したトピックのクライアントにメッセージをプッシュします
int[] qos = {1,1};
//トピック
String[] topics = mqttProperties.getSubTopic().split(",");
//トピックを購読
consumerClient.subscribe(topics,qos);
}
/**
* 接続を切断
*
* @author xct
* @date 2021/8/2 09:30
*/
@SneakyThrows
public void disConnect() {
consumerClient.disconnect();
}
/**
* トピックを購読
*
* @param topic トピック
* @param qos メッセージレベル
* @author xct
* @date 2021/7/30 17:12
*/
@SneakyThrows
public void subscribe(String topic, int qos) {
consumerClient.subscribe(topic, qos);
}
}
コンシューマーも同様に接続情報を使用してコンシューマーのクライアントインスタンスを作成し、コンシューマーのコールバック関数を指定する必要がありますが、異なるのはコンシューマーのメソッドは subscribe
購読メソッドと disConnect
切断メソッドです。
コンシューマーコールバック#
/**
* @author lza
* @date 2023/11/24-10:55
**/
public class MqttConsumerCallBack implements MqttCallback {
private final MqttConsumerConfig consumerConfig;
public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
this.consumerConfig = consumerConfig;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
System.out.println("コンシューマー;サーバーとの接続が切断されました");
}
@Override
public void mqttErrorOccurred(MqttException e) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.printf("受信メッセージトピック : %s%n",s);
System.out.printf("受信メッセージQos : %d%n",mqttMessage.getQos());
System.out.printf("受信メッセージ内容 : %s%n",new String(mqttMessage.getPayload()));
System.out.printf("受信メッセージretained : %b%n",mqttMessage.isRetained());
// mqttV5リクエスト応答モードを設定
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("デバイスが接続されていません".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
}
}
コンシューマーのコールバックで最も重要なのは、メッセージを取得するメソッド messageArrived
です。コンシューマーが購読しているトピックがメッセージを受信すると、このメソッドに入ります。
すべてが正しければ、emqx のバックエンド管理ページでこれら 2 つのクライアント、プロデューサーとコンシューマーがオンラインになっているのを見ることができます。
リクエスト応答#
あなたはすでにコンシューマーの messageArrived
コールバックメソッドにリクエスト応答の注釈があるのを見たかもしれません。これが mqtt のバージョン依存関係を 3.1.1 から 5 に切り替えた理由です。
リクエスト応答モードは mqtt V5 の大きなバージョン特性であり、この文書を参照することができます。また、興味深い例を含むこの文書もあります。
簡単に言うと、私たちは HTTP リクエストが明確なリクエスト / 応答モデルであることを知っています。フロントエンドはインターフェースを通じてバックエンドにリクエストし、バックエンドがデータを処理した後、結果をフロントエンドに返します。失敗しても成功しても、フロントエンドは常に返り値を受け取ることができます。
しかし、mqtt のプッシュ購読モデルでは、プロデューサーとコンシューマーが互いに関心を持たない背景(mqtt は通常 IoT シーンで使用され、IoT のほとんどのシーンはプロデューサーまたはコンシューマーに対して過度に寛容である)を考えてみてください。想像してみてください、あなたはある公衆アカウントを購読していて、その公衆アカウントがいつメッセージをプッシュするかを気にする必要はありません。あなたはただ、その公衆アカウントがプッシュしたメッセージを見逃さないことを保証するだけです。
例えば、あなたが部屋に温度センサーを置いていて、それがネットワークに接続されているとします。外に出て、スマートフォンのアプリを通じてセンサーの現在の温度を取得したいと思っています。
HTTP リクエストに似ていますが、異なるのはバックエンドのデータがセンサーが収集したデータになっていることです。リクエスト応答を使用しない場合でも、例えば 2 つのトピック A と B を作成し、リクエスト時にメッセージを A に送信し、デバイスが A を購読し、データを収集した後に B トピックにデータを送信してアプリが購読することができます。
では、リクエスト応答はどのように行われるのでしょうか?A にメッセージを送信する際に、応答のトピックを直接指定します。デバイスがメッセージを受け取ると、上に太字で書かれているのを見て、「B に来てください」と言って、直接 B トピックにデータを返します。郵便局で手紙を送るプロセスに似ています。
MQTTXで mqtt に接続する際にバージョンを 5 に選択すれば、リクエスト応答のテストを行うことができます。
ここでは、ユーザー属性を判断に使用し、コンシューマーのコールバックメソッドに対応しています。
// mqttV5リクエスト応答モードを設定
List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
MqttProperties properties = new MqttProperties();
if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
MqttMessage responseMessage = new MqttMessage();
properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
responseMessage.setProperties(properties);
responseMessage.setPayload("デバイスが接続されていません".getBytes());
responseMessage.setQos(1);
responseMessage.setRetained(false);
consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
}
}
ユーザー属性 action
の値が remoteWake
の場合、応答モードがトリガーされ、メッセージ内の応答トピックを取得して自動的に返信します。
オンライン・オフライン通知#
リクエスト応答は単なる拡張であり、最初のニーズはデバイスがオンラインかどうかを取得することでした。
mqtt ミドルウェアを変更したので、システムトピックを使用できますか?
答えはできますが、必要ありません。なぜなら、emqx はより優雅な方法を提供しているからです。
最も簡単な方法は、emqx のエンタープライズ版を直接使用することです。
データストレージの主な使用シーンには、クライアントのオンライン・オフライン状態、購読トピック情報、メッセージ内容、メッセージ到達後のメッセージ確認などの操作を Redis、MySQL、PostgreSQL、MongoDB、Cassandra などのさまざまなデータベースに記録することが含まれます。ユーザーは関連トピックを購読することで同様の機能を実現できますが、エンタープライズ版ではこれらの永続化のサポートが組み込まれています。前者と比較して、後者の実行効率は高く、開発者の作業量を大幅に削減できます。
第二の方法、推奨される方法は、webhookを使用してオンライン・オフライン通知を維持することです。
まず、webhook 通知を取得するためのインターフェースを作成します。
private final static String CLIENT_IDS = "clientIds";
private final static String CONNECTED = "client_connected";
private final static String DISCONNECTED = "client_disconnected";
/**
* emqx webhookフック、クライアントのオンライン・オフラインをリスンするためのもの
* @param vo オンライン・オフラインvo
*/
@SaIgnore
@PostMapping("/onlineOrOffline")
public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
System.err.println("クライアント:" + vo.getClientid() +
",アクション:" + vo.getAction() +
",理由:" + vo.getReason());
List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
if (vo.getAction().equals(CONNECTED)) {
list.add(vo.getClientid());
// 既存の値を削除
RedisUtils.deleteKeys(CLIENT_IDS);
// 重複を排除
ArrayList<Object> distinct = CollUtil.distinct(list);
RedisUtils.setCacheList(CLIENT_IDS, distinct);
} else if (vo.getAction().equals(DISCONNECTED)){
list.remove(vo.getClientid());
// 既存の値を削除
RedisUtils.deleteKeys(CLIENT_IDS);
RedisUtils.setCacheList(CLIENT_IDS, list);
}
}
重複を排除する理由は、例えば A クライアントがオンラインになったとき、redis の中に A があり、その後サービスがダウンし、再起動した場合、A が再度オンラインになると、redis の中に 2 つの A が存在することになるからです。
次に、emqx のプラグインで webhook プラグインを有効にします。
次に、docker コンテナ内に入り、プラグイン設定を変更します。
docker exec -it emqx bash
cd /etc/plugins
プラグインを変更します。
vi emqx_web_hook.conf
最も重要な変更は、ファイルの先頭の Webhook URL をインターフェースアドレスに変更することです。
次に、以下の通知ルールを変更します。
前のコメントを削除するだけで、この 2 つがオンライン・オフライン通知のルールです。
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
完了すると、クライアントのオンライン・オフラインが通知されるようになります。
しかし、これだけでは完璧ではありません。もし mqtt ミドルウェアがダウンした場合、その時 webhook と redis に保存されたオンラインデバイスは無効になります。
心配しないでください。emqx はHTTP APIを提供しており、指定されたデバイスがオンラインかどうかを取得できます。
実際、API にはクラスター内のすべてのクライアント情報を取得するインターフェースもありますが、ページネーションされており、大量のクライアントがいる場合は扱いが難しくなります。したがって、webhook を使用してオンライン・オフライン通知を維持し、インターフェースを使用して指定されたクライアントがオンラインかどうかを判断するのが最善です。これにより、ほとんどのニーズを満たすことができます。
/**
* emqx APIを使用してクライアントがオンラインかどうかを検出
* @param clientId クライアントID
* @return オンラインかどうか
*/
@SaIgnore
@GetMapping("/checkClientStatus/{clientId}")
public R<Boolean> checkClientStatus(@PathVariable String clientId) {
// GETリクエストを送信
String url = "http://localhost:18083/api/v4/clients/" + clientId;
HttpRequest request = HttpUtil.createGet(url);
request.basicAuth("admin", "public");
HttpResponse response = request.execute();
ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
}
EMQX ドキュメント
EMQX のインストール使用と一部の問題
MQTT EMQX でクライアントのオンライン・オフラインをリスンし、ビジネスで正常に使用する方法