Do not use rate limiting as the sole defense against attacks. Combine with authentication, authorization, and WAF rules.
Define rate limits per endpoint category and user tier:
# Rate limit configuration
RATE_LIMITS = {
# Authentication endpoints (most restrictive)
"auth": {
"login": {"requests": 5, "window_seconds": 60, "by": "ip"},
"register": {"requests": 3, "window_seconds": 300, "by": "ip"},
"forgot_password": {"requests": 3, "window_seconds": 3600, "by": "ip"},
"verify_mfa": {"requests": 5, "window_seconds": 300, "by": "user"},
},
# Standard API endpoints
"api": {
"free": {"requests": 60, "window_seconds": 60, "by": "user"},
"premium": {"requests": 300, "window_seconds": 60, "by": "user"},
"enterprise": {"requests": 1000, "window_seconds": 60, "by": "user"},
},
# Resource-intensive endpoints
"expensive": {
"search": {"requests": 10, "window_seconds": 60, "by": "user"},
"export": {"requests": 5, "window_seconds": 3600, "by": "user"},
"bulk_import": {"requests": 2, "window_seconds": 3600, "by": "user"},
},
# Global limits
"global": {
"per_ip": {"requests": 1000, "window_seconds": 60, "by": "ip"},
"per_user": {"requests": 5000, "window_seconds": 3600, "by": "user"},
},
}
import redis
import time
import hashlib
from functools import wraps
from flask import Flask, request, jsonify, g
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class SlidingWindowRateLimiter:
"""Sliding window rate limiter using Redis sorted sets."""
def __init__(self, redis_conn):
self.redis = redis_conn
def is_allowed(self, key, max_requests, window_seconds):
"""Check if request is allowed and record it."""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {f"{now}:{hashlib.md5(str(now).encode()).hexdigest()[:8]}": now})
# Set TTL on the key
pipe.expire(key, window_seconds + 1)
results = pipe.execute()
current_count = results[1]
if current_count >= max_requests:
# Calculate retry-after
oldest = self.redis.zrange(key, 0, 0, withscores=True)
if oldest:
retry_after = int(oldest[0][1] + window_seconds - now) + 1
else:
retry_after = window_seconds
return False, current_count, max_requests, retry_after
return True, current_count + 1, max_requests, 0
rate_limiter = SlidingWindowRateLimiter(redis_client)
def rate_limit(max_requests, window_seconds, key_func=None):
"""Decorator for rate limiting API endpoints."""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# Determine the rate limit key
if key_func:
identifier = key_func()
elif hasattr(g, 'user_id'):
identifier = f"user:{g.user_id}"
else:
identifier = f"ip:{request.remote_addr}"
key = f"ratelimit:{request.endpoint}:{identifier}"
allowed, current, limit, retry_after = rate_limiter.is_allowed(
key, max_requests, window_seconds)
# Always set rate limit headers
headers = {
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(max(0, limit - current)),
"X-RateLimit-Reset": str(int(time.time()) + window_seconds),
}
if not allowed:
headers["Retry-After"] = str(retry_after)
response = jsonify({
"error": "rate_limit_exceeded",
"message": "Too many requests. Please try again later.",
"retry_after": retry_after
})
response.status_code = 429
for h, v in headers.items():
response.headers[h] = v
return response
response = f(*args, **kwargs)
for h, v in headers.items():
response.headers[h] = v
return response
return wrapped
return decorator
# Apply rate limiting to endpoints
@app.route('/api/v1/auth/login', methods=['POST'])
@rate_limit(max_requests=5, window_seconds=60,
key_func=lambda: f"ip:{request.remote_addr}")
def login():
# Login logic
return jsonify({"message": "Login successful"})
@app.route('/api/v1/users/me', methods=['GET'])
@rate_limit(max_requests=60, window_seconds=60)
def get_profile():
# Profile logic
return jsonify({"user": "data"})
@app.route('/api/v1/search', methods=['GET'])
@rate_limit(max_requests=10, window_seconds=60)
def search():
# Search logic
return jsonify({"results": []})
import redis
import time
class TokenBucketRateLimiter:
"""Token bucket rate limiter allowing burst traffic within limits."""
def __init__(self, redis_conn):
self.redis = redis_conn
def is_allowed(self, key, max_tokens, refill_rate, refill_interval=1):
"""
Token bucket algorithm:
- max_tokens: Maximum burst capacity
- refill_rate: Tokens added per refill_interval
- refill_interval: Seconds between refills
"""
now = time.time()
bucket_key = f"tb:{key}"
# Lua script for atomic token bucket operation
lua_script = """
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local refill_interval = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
if tokens == nil then
tokens = max_tokens
last_refill = now
end
-- Refill tokens
local elapsed = now - last_refill
local refills = math.floor(elapsed / refill_interval)
if refills > 0 then
tokens = math.min(max_tokens, tokens + (refills * refill_rate))
last_refill = last_refill + (refills * refill_interval)
end
local allowed = 0
if tokens >= 1 then
tokens = tokens - 1
allowed = 1
end
redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
redis.call('expire', key, math.ceil(max_tokens / refill_rate * refill_interval) + 10)
return {allowed, tokens, max_tokens}
"""
result = self.redis.eval(lua_script, 1, bucket_key,
max_tokens, refill_rate, refill_interval, now)
allowed = bool(result[0])
remaining = int(result[1])
limit = int(result[2])
return allowed, remaining, limit
from enum import Enum
class UserTier(Enum):
FREE = "free"
PREMIUM = "premium"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {
"default": (60, 60), # 60 req/min
"search": (10, 60), # 10 req/min
"export": (5, 3600), # 5 req/hour
"daily_total": (1000, 86400), # 1000 req/day
},
UserTier.PREMIUM: {
"default": (300, 60),
"search": (50, 60),
"export": (20, 3600),
"daily_total": (10000, 86400),
},
UserTier.ENTERPRISE: {
"default": (1000, 60),
"search": (200, 60),
"export": (100, 3600),
"daily_total": (100000, 86400),
},
}
def get_rate_limit_for_request(user_tier, endpoint_category="default"):
"""Get rate limit configuration based on user tier and endpoint."""
tier_config = TIER_LIMITS.get(user_tier, TIER_LIMITS[UserTier.FREE])
limit_config = tier_config.get(endpoint_category, tier_config["default"])
return limit_config # (max_requests, window_seconds)
class TieredRateLimitMiddleware:
"""Middleware that applies rate limits based on user subscription tier."""
def __init__(self, app, redis_conn):
self.app = app
self.limiter = SlidingWindowRateLimiter(redis_conn)
def __call__(self, environ, start_response):
# Extract user info from request
user_id = environ.get("HTTP_X_USER_ID")
user_tier = UserTier(environ.get("HTTP_X_USER_TIER", "free"))
endpoint = environ.get("PATH_INFO", "/")
# Determine endpoint category
category = "default"
if "/search" in endpoint:
category = "search"
elif "/export" in endpoint:
category = "export"
max_requests, window = get_rate_limit_for_request(user_tier, category)
key = f"tiered:{user_id or environ.get('REMOTE_ADDR')}:{category}"
allowed, current, limit, retry_after = self.limiter.is_allowed(
key, max_requests, window)
if not allowed:
status = "429 Too Many Requests"
headers = [
("Content-Type", "application/json"),
("Retry-After", str(retry_after)),
("X-RateLimit-Limit", str(limit)),
("X-RateLimit-Remaining", "0"),
]
start_response(status, headers)
body = f'{{"error":"rate_limit_exceeded","retry_after":{retry_after},"tier":"{user_tier.value}"}}'
return [body.encode()]
return self.app(environ, start_response)
# Centralized rate limiting service using Redis Cluster
import redis
from redis.cluster import RedisCluster
class DistributedRateLimiter:
"""Rate limiter for microservice architectures using Redis Cluster."""
def __init__(self):
self.redis = RedisCluster(
startup_nodes=[
{"host": "redis-node-1", "port": 6379},
{"host": "redis-node-2", "port": 6379},
{"host": "redis-node-3", "port": 6379},
],
decode_responses=True
)
def check_and_increment(self, service_name, user_id, endpoint,
max_requests, window_seconds):
"""Atomic check-and-increment using Redis Lua script."""
key = f"rl:{{{service_name}}}:{user_id}:{endpoint}"
# Lua script ensures atomicity across the check and increment
lua_script = """
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window
-- Remove old entries
redis.call('zremrangebyscore', key, '-inf', window_start)
-- Count current entries
local count = redis.call('zcard', key)
if count >= max_requests then
-- Get oldest entry for retry-after calculation
local oldest = redis.call('zrange', key, 0, 0, 'WITHSCORES')
local retry_after = 0
if #oldest > 0 then
retry_after = math.ceil(tonumber(oldest[2]) + window - now)
end
return {0, count, retry_after}
end
-- Add new entry
redis.call('zadd', key, now, now .. ':' .. math.random(100000))
redis.call('expire', key, window + 1)
return {1, count + 1, 0}
"""
result = self.redis.eval(lua_script, 1, key,
max_requests, window_seconds, time.time())
return {
"allowed": bool(result[0]),
"current": int(result[1]),
"retry_after": int(result[2]),
}
| Term | Definition |
|---|---|
| Sliding Window | Rate limiting algorithm that tracks requests in a rolling time window, providing smoother rate enforcement than fixed windows |
| Token Bucket | Algorithm where tokens are added at a fixed rate and consumed per request, allowing controlled bursts up to the bucket capacity |
| Fixed Window | Simplest rate limiting where requests are counted per fixed time window (e.g., per minute), susceptible to burst at window boundaries |
| 429 Too Many Requests | HTTP status code indicating the client has exceeded the rate limit, accompanied by Retry-After header |
| Retry-After Header | HTTP response header telling the client how many seconds to wait before retrying, essential for well-behaved API clients |
| Distributed Rate Limiting | Rate limiting across multiple server instances using shared state (Redis, Memcached) to maintain accurate global counters |
Context: A company launches a public API with free, premium, and enterprise tiers. The API must protect against abuse while providing fair access to paying customers. The API runs on 6 instances behind an AWS ALB.
Approach:
Pitfalls:
## Rate Limiting Implementation Report
**API**: Public API v2
**Algorithm**: Sliding Window (Redis Sorted Sets)
**Backend**: Redis Cluster (3 nodes)
**Deployment**: 6 API instances behind AWS ALB
### Rate Limit Configuration
| Tier | Default | Search | Export | Auth (per IP) |
|------|---------|--------|--------|---------------|
| Free | 60/min | 10/min | 5/hour | 5/min |
| Premium | 300/min | 50/min | 20/hour | 5/min |
| Enterprise | 1000/min | 200/min | 100/hour | 10/min |
### Validation Results (k6 load test)
- Free tier: Rate limited at 61st request (correct)
- Premium tier: Rate limited at 301st request (correct)
- Cross-instance: Rate limiting consistent across all 6 instances
- Redis failover: Rate limiting degrades gracefully (allows traffic) when Redis is unreachable
- Retry-After header: Accurate within 1 second of actual reset time
- Response overhead: < 2ms added latency per request for rate limit check