技能 编程开发 多密钥负载均衡与高并发API调用

多密钥负载均衡与高并发API调用

v20260423
openrouter-load-balancing
本技能旨在解决使用大型语言模型API(如OpenRouter)时遇到的单密钥速率限制问题。它展示了多密钥轮询、健康检查和熔断器机制,将API请求分散到多个密钥,实现负载均衡和高可用性。内容还涵盖了使用asyncio进行并发请求处理和提供商级故障转移,确保在流量激增时系统稳定运行。
获取技能
440 次下载
概览

OpenRouter Load Balancing

Overview

A single OpenRouter API key has rate limits (requests/minute and tokens/minute). To scale beyond those limits, distribute requests across multiple keys. OpenRouter also provides server-side load balancing via provider routing and the :nitro variant for low-latency inference. This skill covers multi-key rotation, health-based routing, circuit breakers, and concurrent request patterns.

Multi-Key Round Robin

import os, itertools, time, logging
from openai import OpenAI, RateLimitError
from dataclasses import dataclass, field

log = logging.getLogger("openrouter.lb")

@dataclass
class KeyPool:
    """Round-robin API key pool with health tracking."""
    keys: list[str]
    _cycle: itertools.cycle = field(init=False, repr=False)
    _health: dict[str, dict] = field(init=False, default_factory=dict)

    def __post_init__(self):
        self._cycle = itertools.cycle(self.keys)
        self._health = {k: {"errors": 0, "last_error": 0, "healthy": True} for k in self.keys}

    def next_key(self) -> str:
        """Get next healthy key."""
        attempts = 0
        while attempts < len(self.keys):
            key = next(self._cycle)
            h = self._health[key]
            # Recover after 60s cooldown
            if not h["healthy"] and time.time() - h["last_error"] > 60:
                h["healthy"] = True
                h["errors"] = 0
            if h["healthy"]:
                return key
            attempts += 1
        # All keys unhealthy -- return any and hope for the best
        return next(self._cycle)

    def mark_error(self, key: str):
        h = self._health[key]
        h["errors"] += 1
        h["last_error"] = time.time()
        if h["errors"] >= 3:  # Circuit breaker: 3 errors → unhealthy
            h["healthy"] = False
            log.warning(f"Key {key[:12]}... marked unhealthy after {h['errors']} errors")

    def mark_success(self, key: str):
        self._health[key]["errors"] = 0
        self._health[key]["healthy"] = True

pool = KeyPool(keys=[
    os.environ.get("OPENROUTER_KEY_1", ""),
    os.environ.get("OPENROUTER_KEY_2", ""),
    os.environ.get("OPENROUTER_KEY_3", ""),
])

def balanced_completion(messages, model="anthropic/claude-3.5-sonnet", **kwargs):
    """Send request using next healthy key from the pool."""
    key = pool.next_key()
    client = OpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=key,
        default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
    )
    try:
        response = client.chat.completions.create(
            model=model, messages=messages, **kwargs
        )
        pool.mark_success(key)
        return response
    except RateLimitError:
        pool.mark_error(key)
        # Retry with next key
        return balanced_completion(messages, model, **kwargs)

Concurrent Request Processing

import asyncio
from openai import AsyncOpenAI

async def parallel_completions(prompts: list[str], model="openai/gpt-4o-mini",
                                max_concurrent=5, **kwargs):
    """Process multiple prompts concurrently with rate limiting."""
    semaphore = asyncio.Semaphore(max_concurrent)
    client = AsyncOpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=os.environ["OPENROUTER_API_KEY"],
        default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
    )

    async def process_one(prompt: str):
        async with semaphore:
            response = await client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                **kwargs,
            )
            return response.choices[0].message.content

    return await asyncio.gather(*[process_one(p) for p in prompts])

# Usage
results = asyncio.run(parallel_completions(
    ["Summarize X", "Translate Y", "Analyze Z"],
    max_concurrent=3,
    max_tokens=500,
))

Provider-Level Load Balancing

# OpenRouter can distribute across providers for the same model
response = client.chat.completions.create(
    model="anthropic/claude-3.5-sonnet",
    messages=[{"role": "user", "content": "Hello"}],
    max_tokens=200,
    extra_body={
        "provider": {
            # Let OpenRouter pick the best available provider
            "order": ["Anthropic", "AWS Bedrock", "GCP Vertex"],
            "allow_fallbacks": True,
        },
    },
)

Rate Limit Awareness

import requests

def check_rate_limits(api_key: str) -> dict:
    """Check current rate limit status for a key."""
    resp = requests.get(
        "https://openrouter.ai/api/v1/auth/key",
        headers={"Authorization": f"Bearer {api_key}"},
    )
    data = resp.json()["data"]
    return {
        "requests_limit": data["rate_limit"]["requests"],
        "interval": data["rate_limit"]["interval"],
        "credits_used": data["usage"],
        "credits_limit": data.get("limit"),
    }

# Check all keys in pool
for key in pool.keys:
    limits = check_rate_limits(key)
    print(f"Key {key[:12]}...: {limits}")

Error Handling

Error Cause Fix
429 on all keys All keys rate-limited simultaneously Add more keys; implement request queuing
Uneven load distribution Round-robin not accounting for in-flight requests Use weighted distribution based on current load
Key health false positive Transient error marked key unhealthy Use sliding window (3 errors in 60s) before marking unhealthy
Concurrent request failures Too many parallel requests Reduce semaphore limit; add backoff

Enterprise Considerations

  • Create separate API keys per service/team with individual credit limits for cost isolation
  • Use 3+ keys to multiply effective rate limits (each key gets its own quota)
  • Implement circuit breakers: mark keys unhealthy after N consecutive errors, recover after cooldown
  • Use asyncio.Semaphore to control concurrency and prevent overwhelming the API
  • Monitor per-key error rates and latency to detect degraded keys early
  • Combine multi-key rotation with provider routing for maximum resilience

References

信息
Category 编程开发
Name openrouter-load-balancing
版本 v20260423
大小 10.02KB
更新时间 2026-04-28
语言