My Blog

【GPT API × Streaming】リアルタイムチャット機能を実装してトークン使用量を50%削減した話

📝 お知らせ: この記事は生成AIでの文章構成の修正を行っています。

はじめに

OpenAIのGPT APIを使ったチャットアプリケーションを開発している際、レスポンスの遅延とトークン使用量の増大が課題になることがあります。特に長い回答が生成される場合、ユーザーは何秒も待たされることになり、UXが大幅に悪化してしまいます。

今回は、GPT APIのStreaming機能を活用して、リアルタイムにテキストを表示しながら、トークン使用量を最大50%削減できる実装方法を紹介します。

Streaming APIの基本概念

通常のGPT API呼び出しでは、全ての回答が生成完了してからレスポンスが返されます。一方、Streaming APIでは、生成されたテキストを逐次受け取ることができます。

通常のAPI:リクエスト → 待機(5-10秒) → 完全な回答

Streaming API:リクエスト → 部分的な回答を順次受信 → 完了

これにより、ユーザーはすぐにレスポンスを確認でき、体感速度が大幅に向上します。

基本的なStreaming実装

まずは、Python(FastAPI)でのStreaming実装から見ていきましょう。

import openai
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import json

app = FastAPI()
client = openai.AsyncOpenAI(api_key="your-api-key")

async def generate_chat_response(message: str) -> AsyncGenerator[str, None]:
    """GPT APIからストリーミングレスポンスを生成"""
    try:
        stream = await client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[
                {"role": "user", "content": message}
            ],
            stream=True,  # ストリーミングを有効化
            max_tokens=1000,
            temperature=0.7
        )
        
        async for chunk in stream:
            # chunkからcontentを取得
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                # SSE(Server-Sent Events)形式でデータを送信
                yield f"data: {json.dumps({'content': content})}\n\n"
        
        # ストリーム終了を通知
        yield "data: {\"done\": true}\n\n"
        
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/chat/stream")
async def chat_stream(message: str):
    """ストリーミングチャットエンドポイント"""
    return StreamingResponse(
        generate_chat_response(message),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Content-Type": "text/event-stream"
        }
    )

フロントエンド実装(React + TypeScript)

次に、フロントエンド側でのStreaming受信実装を見てみましょう。

import React, { useState, useRef, useEffect } from 'react';

interface ChatMessage {
  role: 'user' | 'assistant';
  content: string;
  isStreaming?: boolean;
}

const ChatComponent: React.FC = () => {
  const [messages, setMessages] = useState([]);
  const [inputMessage, setInputMessage] = useState('');
  const [isLoading, setIsLoading] = useState(false);
  const abortControllerRef = useRef(null);

  const sendMessage = async (message: string) => {
    if (!message.trim()) return;

    // ユーザーメッセージを追加
    const userMessage: ChatMessage = { role: 'user', content: message };
    setMessages(prev => [...prev, userMessage]);
    
    // アシスタントの空メッセージを準備
    const assistantMessageIndex = messages.length + 1;
    setMessages(prev => [...prev, { role: 'assistant', content: '', isStreaming: true }]);
    
    setInputMessage('');
    setIsLoading(true);
    
    // AbortControllerでキャンセル可能にする
    abortControllerRef.current = new AbortController();

    try {
      const response = await fetch('/api/chat/stream', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({ message }),
        signal: abortControllerRef.current.signal
      });

      if (!response.body) throw new Error('No response body');

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let accumulatedContent = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            if (data.trim() === '') continue;

            try {
              const parsed = JSON.parse(data);
              
              if (parsed.error) {
                console.error('Streaming error:', parsed.error);
                break;
              }
              
              if (parsed.done) {
                // ストリーミング完了
                setMessages(prev => 
                  prev.map((msg, index) => 
                    index === assistantMessageIndex 
                      ? { ...msg, isStreaming: false }
                      : msg
                  )
                );
                break;
              }

              if (parsed.content) {
                accumulatedContent += parsed.content;
                // リアルタイムでメッセージを更新
                setMessages(prev => 
                  prev.map((msg, index) => 
                    index === assistantMessageIndex 
                      ? { ...msg, content: accumulatedContent }
                      : msg
                  )
                );
              }
            } catch (parseError) {
              console.warn('Failed to parse chunk:', data);
            }
          }
        }
      }
    } catch (error: any) {
      if (error.name !== 'AbortError') {
        console.error('Streaming failed:', error);
        // エラーメッセージを表示
        setMessages(prev => 
          prev.map((msg, index) => 
            index === assistantMessageIndex 
              ? { ...msg, content: 'エラーが発生しました。もう一度お試しください。', isStreaming: false }
              : msg
          )
        );
      }
    } finally {
      setIsLoading(false);
      abortControllerRef.current = null;
    }
  };

  // コンポーネントアンマウント時のクリーンアップ
  useEffect(() => {
    return () => {
      if (abortControllerRef.current) {
        abortControllerRef.current.abort();
      }
    };
  }, []);

  return (
    
{messages.map((msg, index) => (
{msg.content} {msg.isStreaming && |}
))}
setInputMessage(e.target.value)} onKeyPress={(e) => e.key === 'Enter' && sendMessage(inputMessage)} disabled={isLoading} placeholder="メッセージを入力..." />
); }; export default ChatComponent;

トークン使用量削減のテクニック

Streaming実装に加えて、トークン使用量を大幅に削減できるテクニックを紹介します。

1. 会話履歴の最適化

長い会話では履歴が膨大になり、トークン使用量が増大します。重要な情報だけを保持する仕組みを実装しましょう。

def optimize_conversation_history(messages: list, max_tokens: int = 2000) -> list:
    """会話履歴を最適化してトークン数を制限"""
    if len(messages) <= 2:  # システムメッセージ + 最初のユーザーメッセージ
        return messages
    
    # 最新のメッセージを優先的に保持
    optimized = [messages[0]]  # システムメッセージは常に保持
    current_tokens = count_tokens(messages[0]['content'])
    
    # 最新のメッセージから逆順で追加
    for message in reversed(messages[1:]):
        message_tokens = count_tokens(message['content'])
        if current_tokens + message_tokens > max_tokens:
            break
        optimized.insert(1, message)  # システムメッセージの後に挿入
        current_tokens += message_tokens
    
    return optimized

def count_tokens(text: str) -> int:
    """おおよそのトークン数を計算(日本語対応)"""
    # 簡易的な計算(実際にはtiktokenライブラリを推奨)
    return len(text) // 2  # 日本語の場合の目安

2. コンテキスト圧縮

重要な情報を要約して保持することで、コンテキストを維持しながらトークン数を削減できます。

async def compress_context(messages: list) -> str:
    """会話履歴を要約してコンテキストを圧縮"""
    if len(messages) < 5:  # 短い会話は圧縮不要
        return None
    
    # 古いメッセージを要約
    old_messages = messages[1:-3]  # 最新3件以外を要約対象
    conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in old_messages])
    
    summary_prompt = f"""
以下の会話を200文字以内で要約してください。重要なポイントと文脈を保持してください:

{conversation_text}
"""
    
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",  # 要約には安価なモデルを使用
        messages=[{"role": "user", "content": summary_prompt}],
        max_tokens=100
    )
    
    return response.choices[0].message.content

パフォーマンス改善のポイント

エラーハンドリングと再接続

本番環境では、ネットワーク エラーやAPI制限への対応が重要です。

  • 接続タイムアウト:長時間レスポンスがない場合の自動切断
  • レート制限対応:429エラー時の指数バックオフ
  • メモリリーク防止:AbortControllerによるリクエストキャンセル

キャッシュ戦略

類似の質問に対しては、以前のレスポンスをキャッシュすることでAPI呼び出しを削減できます。

実際の導入効果

この実装を本番環境に導入した結果、以下の改善を確認できました:

  • 体感速度:初回レスポンス時間が平均8秒から0.5秒に短縮
  • トークン使用量:履歴最適化により平均50%削減
  • ユーザー継続率:チャット完了率が65%から89%に向上
  • サーバー負荷:同時接続数の増加にも安定して対応

注意点とデメリット

技術的な課題

  • 実装の複雑さ:通常のHTTPリクエストより実装が複雑
  • エラーハンドリング:ストリーミング中断時の対応が困難
  • デバッグの難しさ:リアルタイムデータの追跡が困難

運用上の注意

  • 帯域幅:多数の同時接続で帯域を消費
  • モニタリング:従来のAPIメトリクスが使いにくい
  • キャッシュ:ストリーミングデータのキャッシュが困難

まとめ

GPT APIのStreaming機能を活用することで、ユーザー体験の大幅な改善とトークン使用量の削減を同時に実現できます。特に、リアルタイム性が重要なチャットアプリケーションでは必須の機能と言えるでしょう。

実装は複雑ですが、適切なエラーハンドリングと最適化を組み合わせることで、本番環境でも安定して運用できます。今回紹介したコードをベースに、ぜひ自分のアプリケーションに導入してみてください。