技能 编程开发 Azure 队列存储 Python SDK

Azure 队列存储 Python SDK

v20260301
azure-storage-queue-py
Azure 队列存储的 Python SDK,支持可靠的消息入队/出队/删除、异步操作、可配置可见性和元数据等,适合构建分布式任务调度和监控流程。
获取技能
489 次下载
概览

Azure Queue Storage SDK for Python

Simple, cost-effective message queuing for asynchronous communication.

Installation

pip install azure-storage-queue azure-identity

Environment Variables

AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.net

Authentication

from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient, QueueClient

credential = DefaultAzureCredential()
account_url = "https://<account>.queue.core.windows.net"

# Service client
service_client = QueueServiceClient(account_url=account_url, credential=credential)

# Queue client
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)

Queue Operations

# Create queue
service_client.create_queue("myqueue")

# Get queue client
queue_client = service_client.get_queue_client("myqueue")

# Delete queue
service_client.delete_queue("myqueue")

# List queues
for queue in service_client.list_queues():
    print(queue.name)

Send Messages

# Send message (string)
queue_client.send_message("Hello, Queue!")

# Send with options
queue_client.send_message(
    content="Delayed message",
    visibility_timeout=60,  # Hidden for 60 seconds
    time_to_live=3600       # Expires in 1 hour
)

# Send JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))

Receive Messages

# Receive messages (makes them invisible temporarily)
messages = queue_client.receive_messages(
    messages_per_page=10,
    visibility_timeout=30  # 30 seconds to process
)

for message in messages:
    print(f"ID: {message.id}")
    print(f"Content: {message.content}")
    print(f"Dequeue count: {message.dequeue_count}")
    
    # Process message...
    
    # Delete after processing
    queue_client.delete_message(message)

Peek Messages

# Peek without hiding (doesn't affect visibility)
messages = queue_client.peek_messages(max_messages=5)

for message in messages:
    print(message.content)

Update Message

# Extend visibility or update content
messages = queue_client.receive_messages()
for message in messages:
    # Extend timeout (need more time)
    queue_client.update_message(
        message,
        visibility_timeout=60
    )
    
    # Update content and timeout
    queue_client.update_message(
        message,
        content="Updated content",
        visibility_timeout=60
    )

Delete Message

# Delete after successful processing
messages = queue_client.receive_messages()
for message in messages:
    try:
        # Process...
        queue_client.delete_message(message)
    except Exception:
        # Message becomes visible again after timeout
        pass

Clear Queue

# Delete all messages
queue_client.clear_messages()

Queue Properties

# Get queue properties
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")

# Set/get metadata
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)

Async Client

from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential

async def queue_operations():
    credential = DefaultAzureCredential()
    
    async with QueueClient(
        account_url="https://<account>.queue.core.windows.net",
        queue_name="myqueue",
        credential=credential
    ) as client:
        # Send
        await client.send_message("Async message")
        
        # Receive
        async for message in client.receive_messages():
            print(message.content)
            await client.delete_message(message)

import asyncio
asyncio.run(queue_operations())

Base64 Encoding

from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy

# For binary data
queue_client = QueueClient(
    account_url=account_url,
    queue_name="myqueue",
    credential=credential,
    message_encode_policy=BinaryBase64EncodePolicy(),
    message_decode_policy=BinaryBase64DecodePolicy()
)

# Send bytes
queue_client.send_message(b"Binary content")

Best Practices

  1. Delete messages after processing to prevent reprocessing
  2. Set appropriate visibility timeout based on processing time
  3. Handle dequeue_count for poison message detection
  4. Use async client for high-throughput scenarios
  5. Use peek_messages for monitoring without affecting queue
  6. Set time_to_live to prevent stale messages
  7. Consider Service Bus for advanced features (sessions, topics)

When to Use

This skill is applicable to execute the workflow or actions described in the overview.

信息
Category 编程开发
Name azure-storage-queue-py
版本 v20260301
大小 5.03KB
更新时间 2026-03-02
语言