技能 人工智能 Anthropic事件流与异步处理

Anthropic事件流与异步处理

v20260423
anth-webhooks-events
本技能涵盖了使用Anthropic Claude API实现事件驱动架构的关键模式。它详细介绍了两种高级处理方式:用于实时流式传输的SSE事件,以及用于批量、异步处理的大规模消息批次API。适用于构建实时聊天界面或需要处理大量数据的后台任务。
获取技能
236 次下载
概览

Anthropic Events & Async Processing

Overview

The Claude API does not use traditional webhooks. Instead it provides two event-driven patterns: Server-Sent Events (SSE) for real-time streaming and the Message Batches API for async bulk processing. This skill covers both.

SSE Streaming Events

import anthropic

client = anthropic.Anthropic()

# Process each SSE event type
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain microservices."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Started: {event.message.id}")
            case "content_block_start":
                if event.content_block.type == "tool_use":
                    print(f"Tool call: {event.content_block.name}")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="")
            case "message_delta":
                print(f"\nStop: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("[Complete]")

SSE Event Reference

Event When Key Data
message_start Stream begins message.id, message.model, message.usage.input_tokens
content_block_start New block begins content_block.type (text or tool_use), index
content_block_delta Incremental content delta.text or delta.partial_json
content_block_stop Block finishes index
message_delta Message-level update delta.stop_reason, usage.output_tokens
message_stop Stream complete (empty)
ping Keepalive (empty)

Async Batch Processing

# Submit batch (up to 100K requests, 50% cheaper)
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
            }
        }
        for i, doc in enumerate(documents)
    ]
)

# Poll for completion
import time
while True:
    status = client.messages.batches.retrieve(batch.id)
    if status.processing_status == "ended":
        break
    counts = status.request_counts
    print(f"Processing: {counts.processing} | Done: {counts.succeeded} | Errors: {counts.errored}")
    time.sleep(30)

# Stream results
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        print(f"[{result.custom_id}]: {result.result.message.content[0].text[:100]}")
    else:
        print(f"[{result.custom_id}] ERROR: {result.result.error}")

Event-Driven Architecture Pattern

# Use queues to decouple Claude requests from user-facing endpoints
from redis import Redis
from rq import Queue

redis = Redis()
queue = Queue(connection=redis)

def process_with_claude(prompt: str, callback_url: str):
    """Background job for async Claude processing."""
    client = anthropic.Anthropic()
    msg = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    # Notify your system via internal callback
    import requests
    requests.post(callback_url, json={
        "text": msg.content[0].text,
        "usage": {"input": msg.usage.input_tokens, "output": msg.usage.output_tokens}
    })

# Enqueue from your API handler
job = queue.enqueue(process_with_claude, prompt="...", callback_url="https://internal/callback")

Error Handling

Issue Cause Fix
Stream disconnects Network timeout Reconnect and re-request (responses are not resumable)
Batch expired Not processed in 24h Resubmit the batch
errored results Individual request was invalid Check result.error.message per request

Resources

Next Steps

For performance optimization, see anth-performance-tuning.

信息
Category 人工智能
Name anth-webhooks-events
版本 v20260423
大小 4.96KB
更新时间 2026-04-26
语言