上記のテキストを日本語に翻訳します:
前述のように、call
メソッドを使用して単純な呼び出しと出力を実現しました。このメソッドは完全な結果の返却を待つため、時間がかかる場合があります。
DEBUG でも結果が一緒に返されることがわかります。
しかし、通常、AI との対話では結果が一文字ずつまたは一部ずつ表示されます。ここではストリーム出力を使用しています。
flux#
公式ドキュメントを注意深く見ると、実際にストリーム出力があるコードがあることがわかります。
@GetMapping("/ai/generateStream")
public Flux<ChatResponse> generateStream(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.stream(prompt);
}
ただし、多くの人が Flux を見ると困惑してしまい、何なのかわからないと言っています。私もよくわかりません🤔。WebFlux のリアクティブプログラミングについては聞いたことがありますが、Flux とリアクティブとは何かは全くわかりません。
しかし、グループのメンバーに尋ねたところ、このものは学ぶ必要がないと言っていました。ほとんど使われないそうです。
コードを直接示します。
/**
* spring ai 官方的流式对话接口 使用 webflux
* @param message prompt
* @return Flux<String>
*/
@GetMapping(value = "chatStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatSse(@PathVariable String message) {
Prompt prompt = getPrompt(message);
return chatClient.stream(prompt)
.filter(chatResponse -> chatResponse.getResult().getOutput().getContent() != null)
.flatMap(chatResponse -> Flux.just(chatResponse.getResult().getOutput().getContent()))
.doOnNext(System.out::println)
.doOnError(throwable -> System.err.println("err: " + throwable.getMessage()))
.doOnComplete(() -> System.out.println("complete~!"));
}
ここで呼び出されるメソッドは stream
です。
まず、最初の filter
は null をフィルタリングすることを示しています。これはストリームの最後のフィールドが null であり、終了を示すためです。flatMap
の中にある Flux.just()
は返された内容を Flux に入れることを示しています。したがって、最初に null をフィルタリングする必要があります。そうしないと、このステップでエラーが発生します。
SSE#
実際、最初に思いついた解決策は SSE(Server Sent Events - サーバーからのイベント通知)です。アクティブプッシュについて考えると、最初に思い浮かぶのはこれです。ネイティブであり、他の依存関係を追加する必要はありません。
コードも非常にシンプルです。
/**
* 流式对话接口
*
* @param message prompt
* @return SseEmitter
*/
@GetMapping("stream")
public SseEmitter streamCompletion(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
SseEmitter emitter = new SseEmitter(5L * 60 * 1000);
Flux<String> stream = chatClient.stream(message);
stream.subscribe(it -> {
try {
System.out.println(it);
emitter.send(it, MediaType.TEXT_EVENT_STREAM);
} catch (IOException e) {
System.out.println("sse发送消息失败");
emitter.completeWithError(e);
}
});
stream.doOnError(e -> {
System.out.println("流式对话发生异常");
emitter.completeWithError(e);
});
stream.doOnComplete(emitter::complete);
return emitter;
}
結果は以下の通りです。