技能 人工智能 Anthropic流式与批量处理工作流

Anthropic流式与批量处理工作流

v20260423
anth-core-workflow-b
本指南教授如何构建基于Anthropic API的高级工作流。它涵盖了两种核心模式:一是用于交互式UI的实时流式传输(SSE事件),二是消息批量API(Message Batches),可实现异步、成本更低的超大规模数据批处理。提供了Python和TypeScript的代码示例。
获取技能
75 次下载
概览

Anthropic Core Workflow B — Streaming & Batches

Overview

Two complementary patterns: real-time streaming for interactive UIs (SSE events via POST /v1/messages with stream: true) and the Message Batches API (POST /v1/messages/batches) for processing up to 100,000 requests asynchronously at 50% cost reduction.

Prerequisites

  • Completed anth-install-auth setup
  • Familiarity with anth-core-workflow-a (Messages API basics)
  • For batches: understanding of async/polling patterns

Instructions

Streaming — Python SDK

import anthropic

client = anthropic.Anthropic()

# Method 1: High-level streaming (recommended)
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=2048,
    messages=[{"role": "user", "content": "Write a short story about a robot."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # After stream completes, access full message
    final_message = stream.get_final_message()
    print(f"\nUsage: {final_message.usage.input_tokens}+{final_message.usage.output_tokens}")

# Method 2: Event-level streaming (for custom event handling)
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=2048,
    messages=[{"role": "user", "content": "Explain REST APIs."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
        elif event.type == "message_stop":
            print("\n[Stream complete]")

Streaming — TypeScript SDK

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

// High-level streaming
const stream = client.messages.stream({
  model: 'claude-sonnet-4-20250514',
  max_tokens: 2048,
  messages: [{ role: 'user', content: 'Write a haiku about code.' }],
});

stream.on('text', (text) => process.stdout.write(text));
stream.on('finalMessage', (msg) => {
  console.log(`\nTokens: ${msg.usage.input_tokens}+${msg.usage.output_tokens}`);
});

await stream.finalMessage();

Streaming with Tool Use

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,  # Same tools array from core-workflow-a
    messages=[{"role": "user", "content": "What's the weather?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                print(f"Tool call: {event.content_block.name}")
        elif event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                print(event.delta.partial_json, end="")  # Tool input arrives incrementally

Message Batches API — Bulk Processing

# Create a batch of up to 100,000 requests (50% cost savings)
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": "req-001",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: ...article1..."}]
            }
        },
        {
            "custom_id": "req-002",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: ...article2..."}]
            }
        },
        # ... up to 100,000 requests
    ]
)

print(f"Batch ID: {batch.id}")          # msgbatch_01HBMt...
print(f"Status: {batch.processing_status}")  # in_progress
print(f"Counts: {batch.request_counts}")     # {processing: 2, succeeded: 0, ...}

Poll for Batch Completion

import time

while True:
    batch_status = client.messages.batches.retrieve(batch.id)
    if batch_status.processing_status == "ended":
        break
    print(f"Processing... {batch_status.request_counts}")
    time.sleep(30)

# Stream results (returns JSONL)
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        text = result.result.message.content[0].text
        print(f"[{result.custom_id}]: {text[:100]}...")
    elif result.result.type == "errored":
        print(f"[{result.custom_id}] ERROR: {result.result.error}")

SSE Event Types Reference

Event Description Key Fields
message_start Stream begins message.id, message.model, message.usage
content_block_start New content block content_block.type (text/tool_use)
content_block_delta Incremental content delta.text or delta.partial_json
content_block_stop Block complete index
message_delta Message-level update delta.stop_reason, usage.output_tokens
message_stop Stream complete (empty)
ping Keepalive (empty)

Error Handling

Error Cause Solution
Stream disconnects mid-response Network timeout Implement reconnection with partial content
Batch expired status Not processed within 24h Resubmit batch
errored results in batch Individual request invalid Check result.error for each failed request
429 on batch creation Too many concurrent batches Wait; limit is ~100 concurrent batches

Resources

Next Steps

For common errors, see anth-common-errors.

信息
Category 人工智能
Name anth-core-workflow-b
版本 v20260423
大小 6.17KB
更新时间 2026-04-26
语言