My Blog

【FastAPI × WebSocket】リアルタイム通信でAPIの使用量監視ダッシュボードを作ってみた

📝 お知らせ: この記事は生成AIでの文章構成の修正を行っています。

はじめに

近年、APIの使用量やパフォーマンスをリアルタイムで監視する需要が高まっています。特に、複数のマイクロサービスや外部API(OpenAI GPT API、Stripeなど)を利用するシステムでは、コスト管理やパフォーマンス最適化のためにリアルタイムな可視化が重要です。

今回は、FastAPIのWebSocket機能を使って、APIの使用量をリアルタイムで監視できるダッシュボードを実装してみました。

なぜWebSocketを選んだのか

従来のHTTPポーリングではなくWebSocketを選んだ理由は以下の通りです:

  • リアルタイム性:APIの使用状況を即座に反映
  • 効率性:ポーリングと比較して通信量を80%削減
  • 双方向通信:ダッシュボードからの操作(フィルタリングなど)も可能
  • FastAPIとの親和性:標準でWebSocketサポート

システム構成と実装

基本的なFastAPIサーバーの実装

まず、WebSocketエンドポイントを持つFastAPIサーバーを作成します。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import json
import asyncio
from datetime import datetime
from typing import List, Dict
import uvicorn

app = FastAPI(title="API監視ダッシュボード")

# WebSocket接続管理クラス
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.api_metrics: Dict = {
            "total_requests": 0,
            "success_count": 0,
            "error_count": 0,
            "avg_response_time": 0,
            "cost_today": 0.0
        }

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
        # 接続時に現在のメトリクスを送信
        await self.send_metrics_to_client(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_metrics_to_all(self):
        """全クライアントにメトリクスを送信"""
        for connection in self.active_connections:
            await self.send_metrics_to_client(connection)

    async def send_metrics_to_client(self, websocket: WebSocket):
        """特定のクライアントにメトリクスを送信"""
        try:
            message = {
                "type": "metrics_update",
                "data": self.api_metrics,
                "timestamp": datetime.now().isoformat()
            }
            await websocket.send_text(json.dumps(message))
        except:
            # 接続が切れている場合は無視
            pass

    def update_metrics(self, request_data: dict):
        """メトリクスを更新"""
        self.api_metrics["total_requests"] += 1
        if request_data.get("status") == "success":
            self.api_metrics["success_count"] += 1
        else:
            self.api_metrics["error_count"] += 1
        
        # 平均レスポンス時間の更新(簡略化)
        response_time = request_data.get("response_time", 0)
        current_avg = self.api_metrics["avg_response_time"]
        total_requests = self.api_metrics["total_requests"]
        self.api_metrics["avg_response_time"] = (
            (current_avg * (total_requests - 1) + response_time) / total_requests
        )
        
        # コストの更新
        self.api_metrics["cost_today"] += request_data.get("cost", 0.0)

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            # クライアントからのメッセージを受信(フィルタリング等)
            data = await websocket.receive_text()
            message = json.loads(data)
            
            if message.get("type") == "filter_request":
                # フィルタリング処理(省略)
                pass
    except WebSocketDisconnect:
        manager.disconnect(websocket)

APIメトリクス収集のミドルウェア

実際のAPI呼び出しを監視するためのミドルウェアを実装します。

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import time
import asyncio

class APIMonitoringMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # APIパスのみを監視(静的ファイルは除外)
        if request.url.path.startswith("/api/"):
            start_time = time.time()
            
            try:
                response = await call_next(request)
                end_time = time.time()
                response_time = round((end_time - start_time) * 1000, 2)  # ms
                
                # メトリクス更新
                request_data = {
                    "status": "success" if response.status_code < 400 else "error",
                    "response_time": response_time,
                    "cost": self.calculate_api_cost(request, response),
                    "endpoint": request.url.path,
                    "method": request.method
                }
                
                manager.update_metrics(request_data)
                
                # 全クライアントに更新を通知
                asyncio.create_task(manager.send_metrics_to_all())
                
                return response
                
            except Exception as e:
                end_time = time.time()
                response_time = round((end_time - start_time) * 1000, 2)
                
                request_data = {
                    "status": "error",
                    "response_time": response_time,
                    "cost": 0.0,
                    "endpoint": request.url.path,
                    "method": request.method
                }
                
                manager.update_metrics(request_data)
                asyncio.create_task(manager.send_metrics_to_all())
                
                raise e
        else:
            return await call_next(request)
    
    def calculate_api_cost(self, request: Request, response: Response) -> float:
        """API使用料金を計算(例:OpenAI API)"""
        # 実際の実装では、レスポンスヘッダーやボディから使用量を取得
        # ここでは簡略化
        if "openai" in request.url.path:
            return 0.002  # 仮の料金
        elif "stripe" in request.url.path:
            return 0.001
        return 0.0

# ミドルウェアを追加
app.add_middleware(APIMonitoringMiddleware)

フロントエンド(HTML + JavaScript)

リアルタイムでデータを表示するシンプルなダッシュボードを作成します。



    API監視ダッシュボード
    

    

API監視ダッシュボード

切断中
0
総リクエスト数
0%
成功率
0ms
平均レスポンス時間
$0.00
今日のコスト

実装のポイントと注意点

メリット

  • リアルタイム監視:APIの問題を即座に検知可能
  • コスト可視化:予算オーバーを防げる
  • パフォーマンス分析:ボトルネックの特定が容易
  • 拡張性:新しいメトリクスを簡単に追加可能

注意点・改善点

  • メモリ使用量:メトリクスデータの永続化を検討
  • セキュリティ:認証機能の追加が必要
  • スケーラビリティ:Redis等を使った分散対応
  • データ精度:高負荷時のメトリクス取りこぼし対策

本番運用での改善案

# Redis を使った分散対応の例
import redis
import json

class RedisConnectionManager(ConnectionManager):
    def __init__(self):
        super().__init__()
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.metrics_key = "api_metrics"
    
    def update_metrics(self, request_data: dict):
        # Redisにメトリクスを保存
        current_metrics = self.redis_client.get(self.metrics_key)
        if current_metrics:
            metrics = json.loads(current_metrics)
        else:
            metrics = self.api_metrics.copy()
        
        # メトリクス更新ロジック
        metrics["total_requests"] += 1
        # ... その他の更新処理
        
        # Redisに保存
        self.redis_client.set(
            self.metrics_key, 
            json.dumps(metrics),
            ex=86400  # 24時間で期限切れ
        )
        
        # Pub/Subで他のワーカーに通知
        self.redis_client.publish("metrics_update", json.dumps(metrics))

まとめ

FastAPIのWebSocket機能を活用することで、シンプルながら実用的なリアルタイム監視ダッシュボードを構築できます。複数のAPIを利用するシステムでは、コスト管理とパフォーマンス最適化の両面で大きな効果を発揮します。

今回の実装は基本的な機能に絞りましたが、実際の本番環境では認証、データ永続化、アラート機能などの追加を検討してください。また、Prometheus + Grafanaなどの本格的な監視ツールと組み合わせることで、さらに高度な監視システムを構築することも可能です。

WebSocketを使ったリアルタイム通信は、監視ダッシュボード以外にもチャット機能、ゲーム、コラボレーションツールなど様々な用途に応用できるため、ぜひ挑戦してみてください。