中間層を介した Server-Sent Events
前回、Azure OpenAI Service からの応答をストリームで受信する方法を説明しました。Azure OpenAI Service では、応答を Server-Sent Events で受信でき、順次、回答の部分文字列を取得でき、取得した文字列を順次レンダリングすることができます。
ただし、Azure OpenAI Service を利用したシステムを構築する場合では、前回の例のようなフロントエンドから直接 Azure OpenAI Service へ要求を行うシーンは少ないと思います。 多くのシステムでは、以下のように、フロントエンドから、中間層へ API 呼び出しを行い、中間層から Azure OpenAI Service へ要求を行うように構成します。
このような構成においては、フロントエンドで順次、応答を受信するためには、中間層でも Server-Sent Events、もしくは、同様にクライアントに順次、プッシュで応答を返すプロトコルを実装してやる必要があります。
そこで、今回は、中間層で Server-Sent Events を実装し、中間層を介したストリーム受信の方法を説明します。
ASP.NET Core Web API による中間層の実装
以下のコードは、ASP.NET Core Web API のコントローラーの実装例です。
[HttpPost]
public async Task Post([FromBody] string message, CancellationToken cancellationToken)
{
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Content-Type", "text/event-stream");
var writer = new StreamWriter(Response.Body);
ChatCompletionsOptions chatCompletionsOptions = new()
{
DeploymentName = _modelName,
Messages =
{
new ChatRequestUserMessage(message),
}
};
var messageId = Guid.NewGuid().ToString();
await foreach (StreamingChatCompletionsUpdate chatUpdate in
_openAIClient.GetChatCompletionsStreaming(chatCompletionsOptions)
.WithCancellation(cancellationToken)) // when clinet connection aborted.
{
var json = new
{
id = messageId,
role = chatUpdate.Role?.ToString(),
content = chatUpdate.ContentUpdate,
createdDateTime = DateTimeOffset.Now
};
await writer.WriteAsync($"data: {JsonSerializer.Serialize(json)}\n\n");
await writer.FlushAsync();
}
string doneEvent = "data: [DONE]\n\n";
await writer.WriteLineAsync(doneEvent);
await writer.FlushAsync();
}
このコードの流れは以下の通りです。
- クライアントへの応答ヘッダーに、
Content-Type:text/event-stream
を設定。 - Azure OpenAI Service へ要求を送信。
- await foreach 内で Azure OpenAI Service から受信した応答を順次、クライアントへの応答ストリームへ書き出す。
- クライアントに [DONE] を送信して終了。
3 のところでは、StreamWriter.FlushAsync
を実行することで、順次クライアントへ、Azure OpenAI Service から受信した応答を送信しています。
クライアントへ送信しているイベントのフォーマットは、以下の通りです。
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":"assistant","content":"","createdDateTime":"2024-03-07T23:35:06.4680102+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"望","createdDateTime":"2024-03-07T23:36:09.0325729+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"月","createdDateTime":"2024-03-07T23:36:10.0417531+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"の","createdDateTime":"2024-03-07T23:36:11.0496137+09:00"}\n\n
Server-Sent Events のデータのみのメッセージとして、クライアントが必要な情報のみに絞った JSON 文字列を送信しています。改行コードを二つ付与して送信しています。
Server-Sent Events の詳細な仕様は、以下にあります。
フロントエンドの実装
今回、フロントエンドは、.NET コンソールアプリで実装しました。 以下は、中間層の API へ要求を送信するメソッドの例です。
private static async Task GetChatCompletionsStreamingViaCustomApiAsync(
string customApiEndpoint,
string userMessage,
int outputMillisecondsDelay,
CancellationToken cancellationToken)
{
HttpRequestMessage httpRequestMessage = new()
{
Content = JsonContent.Create(userMessage),
Method = HttpMethod.Post,
RequestUri = new Uri(customApiEndpoint),
};
var response = await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
await foreach (var jsonNode in
ReadSseStreamingAsync(response, cancellationToken)
.ConfigureAwait(false))
{
if (jsonNode == null)
{
continue;
}
var role = jsonNode?["role"]?.GetValue<string>();
var chunkedContent = jsonNode?["content"]?.GetValue<string>();
if (role != null)
{
Console.WriteLine($"\n{role}: ");
}
if (chunkedContent != null)
{
Console.Write(chunkedContent);
}
await Task.Delay(outputMillisecondsDelay).ConfigureAwait(false);
}
Console.WriteLine();
}
HttpClient.SendAsync
メソッドで要求を送信する際に、completionOption を HttpCompletionOption.ResponseHeadersRead
に設定する必要があります。この設定により、サーバーからの応答ヘッダーを読み込んだ時点で応答が返るようになります。サーバーからの応答の Body をストリームで順次取得するためには、この設定は必須です。
デフォルトは、HttpCompletionOption.ResponseContentRead
であり、この場合は、Body まで読み込んだ時点、つまり Azure OpenAI Service からのすべてのイベントを受信後に応答が返るため、注意が必要です。Server-Sent Events の恩恵を受けることができません。
次に、サーバー側の実装と同様に、await foreach で、順次、応答を読み取り、順次、受信した部分文字列のレンダリングを行っています。
以下は、Server-Sent Events のイベントを読み取るメソッドのコードです。
private static async IAsyncEnumerable<JsonNode?> ReadSseStreamingAsync(
HttpResponseMessage httpResponseMessage,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Stream responseStream = await httpResponseMessage.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
using (var streamReader = new StreamReader(responseStream))
{
while (!cancellationToken.IsCancellationRequested)
{
var line = await streamReader.ReadLineAsync().ConfigureAwait(false);
if (string.IsNullOrEmpty(line))
{
continue;
}
else if (line == "data: [DONE]")
{
break;
}
else if (line.StartsWith("data: "))
{
var body = line.Substring(6, line.Length - 6);
yield return JsonSerializer.Deserialize<JsonNode>(body);
}
}
};
}
このメソッドは、非同期ストリーム IAsyncEnumerable<JsonNode>
を返し、利用側で、await foreach を使用し、順次、サーバーから受信したイベントメッセージをできるようにしています。
実装としては、HttpResponseMessage.Content.ReadAsStreamAsync
で、サーバーからの応答を Stream で取得し、while ループ内で、順次、中間層が送信するイベントを行ごとで取得し、読み取った JSON メッセージを yield return
で返しています。
Server-Send Events イベント ストリーム フォーマットの解析部分は、最小限の実装にしています。サーバー側の実装を自身で行っており、メッセージのフォーマットが既知であるため、厳密なフォーマットの解析は行っていません。
Server-Send Events イベント ストリーム フォーマット
参考として、イベント ストリーム フォーマットの定義は、以下のように、ABNF(拡張バッカス・ナウア記法) で説明されています。
stream = [ bom ] *event
event = *( comment / field ) end-of-line
comment = colon *any-char end-of-line
field = 1*name-char [ colon [ space ] *any-char ] end-of-line
end-of-line = ( cr lf / cr / lf )
; characters
lf = %x000A ; U+000A LINE FEED (LF)
cr = %x000D ; U+000D CARRIAGE RETURN (CR)
space = %x0020 ; U+0020 SPACE
colon = %x003A ; U+003A COLON (:)
bom = %xFEFF ; U+FEFF BYTE ORDER MARK
name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
サンプルコード
今回説明した各サンプル コードは、以下に掲載しています。
以上、参考までに。
コメント (0)
コメントの投稿