ASP.NET Core Web API を経由した Azure OpenAI Service 応答ストリーム(Server-Side Events)の受信

2024/03/08
★★★★

中間層を介した Server-Sent Events

前回、Azure OpenAI Service からの応答をストリームで受信する方法を説明しました。Azure OpenAI Service では、応答を Server-Sent Events で受信でき、順次、回答の部分文字列を取得でき、取得した文字列を順次レンダリングすることができます。

ただし、Azure OpenAI Service を利用したシステムを構築する場合では、前回の例のようなフロントエンドから直接 Azure OpenAI Service へ要求を行うシーンは少ないと思います。 多くのシステムでは、以下のように、フロントエンドから、中間層へ API 呼び出しを行い、中間層から Azure OpenAI Service へ要求を行うように構成します。

中間層を介した Azure OpenAI Service へのアクセス

このような構成においては、フロントエンドで順次、応答を受信するためには、中間層でも Server-Sent Events、もしくは、同様にクライアントに順次、プッシュで応答を返すプロトコルを実装してやる必要があります。

中間層を介した Server-Sent Events の構成

そこで、今回は、中間層で Server-Sent Events を実装し、中間層を介したストリーム受信の方法を説明します。

ASP.NET Core Web API による中間層の実装

以下のコードは、ASP.NET Core Web API のコントローラーの実装例です。

[HttpPost]
public async Task Post([FromBody] string message, CancellationToken cancellationToken)
{            
    Response.Headers.Append("Cache-Control", "no-cache");    
    Response.Headers.Append("Content-Type", "text/event-stream");    

    var writer = new StreamWriter(Response.Body);

    ChatCompletionsOptions chatCompletionsOptions = new()
    {
        DeploymentName = _modelName,
        Messages =
        {            
            new ChatRequestUserMessage(message),
        }
    };

    var messageId = Guid.NewGuid().ToString();

    await foreach (StreamingChatCompletionsUpdate chatUpdate in 
        _openAIClient.GetChatCompletionsStreaming(chatCompletionsOptions)                
        .WithCancellation(cancellationToken)) // when clinet connection aborted.
    {
        var json = new 
        { 
            id = messageId, 
            role = chatUpdate.Role?.ToString(), 
            content = chatUpdate.ContentUpdate, 
            createdDateTime = DateTimeOffset.Now 
        };

        await writer.WriteAsync($"data: {JsonSerializer.Serialize(json)}\n\n");
        await writer.FlushAsync();        
    }

    string doneEvent = "data: [DONE]\n\n";

    await writer.WriteLineAsync(doneEvent);
    await writer.FlushAsync();
}

このコードの流れは以下の通りです。

  1. クライアントへの応答ヘッダーに、Content-Type:text/event-stream を設定。
  2. Azure OpenAI Service へ要求を送信。
  3. await foreach 内で Azure OpenAI Service から受信した応答を順次、クライアントへの応答ストリームへ書き出す。
  4. クライアントに [DONE] を送信して終了。

3 のところでは、StreamWriter.FlushAsync を実行することで、順次クライアントへ、Azure OpenAI Service から受信した応答を送信しています。
クライアントへ送信しているイベントのフォーマットは、以下の通りです。

data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":"assistant","content":"","createdDateTime":"2024-03-07T23:35:06.4680102+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"望","createdDateTime":"2024-03-07T23:36:09.0325729+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"月","createdDateTime":"2024-03-07T23:36:10.0417531+09:00"}\n\n
data: {"id":"43e509f8-4902-4c43-9d81-5845a03e8375","role":null,"content":"の","createdDateTime":"2024-03-07T23:36:11.0496137+09:00"}\n\n

Server-Sent Events のデータのみのメッセージとして、クライアントが必要な情報のみに絞った JSON 文字列を送信しています。改行コードを二つ付与して送信しています。
Server-Sent Events の詳細な仕様は、以下にあります。

フロントエンドの実装

今回、フロントエンドは、.NET コンソールアプリで実装しました。 以下は、中間層の API へ要求を送信するメソッドの例です。

private static async Task GetChatCompletionsStreamingViaCustomApiAsync(
    string customApiEndpoint, 
    string userMessage, 
    int outputMillisecondsDelay, 
    CancellationToken cancellationToken)
{
    HttpRequestMessage httpRequestMessage = new()
    {
        Content = JsonContent.Create(userMessage),
        Method = HttpMethod.Post,
        RequestUri = new Uri(customApiEndpoint),            
    };

    var response = await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

    await foreach (var jsonNode in
        ReadSseStreamingAsync(response, cancellationToken)
        .ConfigureAwait(false))
    {
        if (jsonNode == null)
        {
            continue;
        }

        var role = jsonNode?["role"]?.GetValue<string>();
        var chunkedContent = jsonNode?["content"]?.GetValue<string>();

        if (role != null)
        {
            Console.WriteLine($"\n{role}: ");
        }

        if (chunkedContent != null)
        {
            Console.Write(chunkedContent);
        }
        
        await Task.Delay(outputMillisecondsDelay).ConfigureAwait(false);
    }

    Console.WriteLine();    
}

HttpClient.SendAsync メソッドで要求を送信する際に、completionOption を HttpCompletionOption.ResponseHeadersRead に設定する必要があります。この設定により、サーバーからの応答ヘッダーを読み込んだ時点で応答が返るようになります。サーバーからの応答の Body をストリームで順次取得するためには、この設定は必須です。
デフォルトは、HttpCompletionOption.ResponseContentRead であり、この場合は、Body まで読み込んだ時点、つまり Azure OpenAI Service からのすべてのイベントを受信後に応答が返るため、注意が必要です。Server-Sent Events の恩恵を受けることができません。

次に、サーバー側の実装と同様に、await foreach で、順次、応答を読み取り、順次、受信した部分文字列のレンダリングを行っています。

以下は、Server-Sent Events のイベントを読み取るメソッドのコードです。

private static async IAsyncEnumerable<JsonNode?> ReadSseStreamingAsync(
    HttpResponseMessage httpResponseMessage,        
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    Stream responseStream = await httpResponseMessage.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);

    using (var streamReader = new StreamReader(responseStream))
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var line = await streamReader.ReadLineAsync().ConfigureAwait(false);

            if (string.IsNullOrEmpty(line))
            {
                continue;
            }
            else if (line == "data: [DONE]")
            {
                break;
            }
            else if (line.StartsWith("data: "))
            {
                var body = line.Substring(6, line.Length - 6);
                yield return JsonSerializer.Deserialize<JsonNode>(body);                    
            }
        }
    };
}    

このメソッドは、非同期ストリーム IAsyncEnumerable<JsonNode> を返し、利用側で、await foreach を使用し、順次、サーバーから受信したイベントメッセージをできるようにしています。
実装としては、HttpResponseMessage.Content.ReadAsStreamAsync で、サーバーからの応答を Stream で取得し、while ループ内で、順次、中間層が送信するイベントを行ごとで取得し、読み取った JSON メッセージを yield return で返しています。
Server-Send Events イベント ストリーム フォーマットの解析部分は、最小限の実装にしています。サーバー側の実装を自身で行っており、メッセージのフォーマットが既知であるため、厳密なフォーマットの解析は行っていません。

Server-Send Events イベント ストリーム フォーマット

参考として、イベント ストリーム フォーマットの定義は、以下のように、ABNF(拡張バッカス・ナウア記法) で説明されています。

stream        = [ bom ] *event
event         = *( comment / field ) end-of-line
comment       = colon *any-char end-of-line
field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
end-of-line   = ( cr lf / cr / lf )

; characters
lf            = %x000A ; U+000A LINE FEED (LF)
cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
space         = %x0020 ; U+0020 SPACE
colon         = %x003A ; U+003A COLON (:)
bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
                ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
                ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)

サンプルコード

今回説明した各サンプル コードは、以下に掲載しています。

以上、参考までに。

コメント (0)

コメントの投稿