技能 编程开发 API 模式验证防护

API 模式验证防护

v20260331
implementing-api-schema-validation-security
通过 OpenAPI 和 JSON Schema 约束请求与响应结构,结合 API 网关与 CI/CD 校验,防止注入、批量赋值与数据泄露,助力合规与安全架构建设。
获取技能
111 次下载
概览

Implementing API Schema Validation Security

Overview

API schema validation enforces that all data exchanged through APIs conforms to a predefined structure defined in OpenAPI Specification (OAS) or JSON Schema documents. This prevents injection attacks (SQLi, XSS, XXE), blocks mass assignment by rejecting unknown properties, prevents data leakage by validating response schemas, and ensures type safety across all API interactions. Schema validation operates at both the API gateway level (runtime enforcement) and during development (shift-left security).

When to Use

  • When deploying or configuring implementing api schema validation security capabilities in your environment
  • When establishing security controls aligned to compliance requirements
  • When building or improving security architecture for this domain
  • When conducting security assessments that require this implementation

Prerequisites

  • OpenAPI Specification v3.0 or v3.1 for all API endpoints
  • API gateway with schema validation support (Cloudflare API Shield, Kong, AWS API Gateway)
  • JSON Schema draft-07 or later understanding
  • Development environment with OpenAPI validation libraries
  • CI/CD pipeline for automated schema compliance testing

Core Implementation

OpenAPI Schema with Security Constraints

openapi: 3.1.0
info:
  title: Secure E-Commerce API
  version: 2.0.0
servers:
  - url: https://api.example.com/v2
    description: Production (HTTPS enforced)
security:
  - OAuth2:
      - read:products
      - write:orders

paths:
  /products:
    post:
      operationId: createProduct
      security:
        - OAuth2: [write:products]
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/ProductCreate'
      responses:
        '201':
          description: Product created
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Product'
        '400':
          $ref: '#/components/responses/ValidationError'
        '401':
          $ref: '#/components/responses/Unauthorized'

  /products/{productId}:
    get:
      operationId: getProduct
      parameters:
        - name: productId
          in: path
          required: true
          schema:
            type: string
            format: uuid
            pattern: '^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$'
      responses:
        '200':
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Product'

components:
  schemas:
    ProductCreate:
      type: object
      required: [name, price, category]
      properties:
        name:
          type: string
          minLength: 1
          maxLength: 200
          pattern: '^[a-zA-Z0-9\s\-\.]+$'  # No special chars for injection prevention
        description:
          type: string
          maxLength: 2000
          # Sanitize HTML entities
        price:
          type: number
          format: float
          minimum: 0.01
          maximum: 999999.99
          exclusiveMinimum: 0
        category:
          type: string
          enum: [electronics, clothing, food, furniture, other]
        tags:
          type: array
          items:
            type: string
            maxLength: 50
            pattern: '^[a-zA-Z0-9\-]+$'
          maxItems: 10
          uniqueItems: true
      additionalProperties: false  # CRITICAL: Prevents mass assignment

    Product:
      type: object
      required: [id, name, price]
      properties:
        id:
          type: string
          format: uuid
          readOnly: true
        name:
          type: string
        price:
          type: number
        category:
          type: string
        tags:
          type: array
          items:
            type: string
        createdAt:
          type: string
          format: date-time
          readOnly: true
      additionalProperties: false  # Prevents data leakage of internal fields

    ValidationErrorResponse:
      type: object
      required: [code, message]
      properties:
        code:
          type: string
          enum: [VALIDATION_ERROR]
        message:
          type: string
          maxLength: 500
        details:
          type: array
          items:
            type: object
            properties:
              field:
                type: string
              error:
                type: string
            additionalProperties: false
          maxItems: 50
      additionalProperties: false

  responses:
    ValidationError:
      description: Request validation failed
      content:
        application/json:
          schema:
            $ref: '#/components/schemas/ValidationErrorResponse'
    Unauthorized:
      description: Authentication required

  securitySchemes:
    OAuth2:
      type: oauth2
      flows:
        authorizationCode:
          authorizationUrl: https://auth.example.com/authorize
          tokenUrl: https://auth.example.com/token
          scopes:
            read:products: Read product data
            write:products: Create and update products
            write:orders: Create orders

Server-Side Schema Validation (Python/FastAPI)

"""API Schema Validation Middleware for FastAPI

Enforces strict schema validation on all request and response payloads
to prevent injection, mass assignment, and data leakage attacks.
"""

from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.middleware import Middleware
from pydantic import BaseModel, Field, field_validator, ConfigDict
from typing import List, Optional
import re
import json
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()


# Strict Pydantic models with security constraints
class ProductCreate(BaseModel):
    model_config = ConfigDict(extra='forbid')  # Reject unknown fields (mass assignment)

    name: str = Field(min_length=1, max_length=200, pattern=r'^[a-zA-Z0-9\s\-\.]+$')
    description: Optional[str] = Field(default=None, max_length=2000)
    price: float = Field(gt=0, le=999999.99)
    category: str = Field(pattern=r'^(electronics|clothing|food|furniture|other)$')
    tags: Optional[List[str]] = Field(default=None, max_length=10)

    @field_validator('name')
    @classmethod
    def sanitize_name(cls, v):
        # Prevent XSS via HTML entities
        dangerous_patterns = ['<script', 'javascript:', 'onerror=', 'onload=']
        lower_v = v.lower()
        for pattern in dangerous_patterns:
            if pattern in lower_v:
                raise ValueError(f'Invalid characters in name')
        return v

    @field_validator('description')
    @classmethod
    def sanitize_description(cls, v):
        if v is None:
            return v
        # Strip potential SQL injection patterns
        sql_patterns = [
            r"('|--|;|/\*|\*/|xp_|exec\s|union\s+select|drop\s+table)",
        ]
        for pattern in sql_patterns:
            if re.search(pattern, v, re.IGNORECASE):
                raise ValueError('Invalid content in description')
        return v

    @field_validator('tags')
    @classmethod
    def validate_tags(cls, v):
        if v is None:
            return v
        if len(v) > 10:
            raise ValueError('Maximum 10 tags allowed')
        for tag in v:
            if not re.match(r'^[a-zA-Z0-9\-]+$', tag) or len(tag) > 50:
                raise ValueError(f'Invalid tag format: {tag}')
        return v


class ProductResponse(BaseModel):
    """Response model that explicitly defines allowed output fields.
    Prevents leakage of internal fields like internal_notes, cost_price, etc."""
    model_config = ConfigDict(extra='forbid')

    id: str
    name: str
    price: float
    category: str
    tags: List[str] = []
    created_at: str


class ResponseValidationMiddleware(BaseHTTPMiddleware):
    """Middleware to validate response payloads against schema.
    Prevents accidental data leakage by checking response content."""

    SCHEMA_MAP = {
        '/api/v2/products': {
            'POST': {'response_model': ProductResponse},
            'GET': {'response_model': ProductResponse},
        }
    }

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Only validate JSON responses
        content_type = response.headers.get('content-type', '')
        if 'application/json' not in content_type:
            return response

        # Check if endpoint has a registered response schema
        path = request.url.path
        method = request.method

        route_config = self.SCHEMA_MAP.get(path, {}).get(method)
        if not route_config:
            return response

        # Read and validate response body
        body = b""
        async for chunk in response.body_iterator:
            body += chunk

        try:
            data = json.loads(body)
            model = route_config['response_model']
            if isinstance(data, list):
                for item in data:
                    model.model_validate(item)
            else:
                model.model_validate(data)
        except Exception as e:
            # Log the validation failure for security monitoring
            print(f"SECURITY: Response schema violation on {method} {path}: {e}")
            # Return a safe error instead of potentially leaked data
            return Response(
                content=json.dumps({"error": "Internal server error"}),
                status_code=500,
                media_type="application/json"
            )

        return Response(
            content=body,
            status_code=response.status_code,
            headers=dict(response.headers),
            media_type=response.media_type
        )


app.add_middleware(ResponseValidationMiddleware)


@app.post("/api/v2/products", response_model=ProductResponse, status_code=201)
async def create_product(product: ProductCreate):
    # ProductCreate model with extra='forbid' automatically rejects
    # any unknown fields, preventing mass assignment attacks
    # (e.g., attacker trying to set is_admin=true or price=0)
    pass

Cloudflare API Shield Schema Validation

# Upload OpenAPI schema to Cloudflare API Shield
curl -X POST "https://api.cloudflare.com/client/v4/zones/{zone_id}/api_gateway/user_schemas" \
  -H "Authorization: Bearer ${CF_API_TOKEN}" \
  -H "Content-Type: multipart/form-data" \
  -F "file=@openapi.yaml" \
  -F "kind=openapi_v3"

# Enable schema validation with blocking mode
curl -X PATCH "https://api.cloudflare.com/client/v4/zones/{zone_id}/api_gateway/settings/schema_validation" \
  -H "Authorization: Bearer ${CF_API_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "validation_default_mitigation_action": "block",
    "validation_override_mitigation_action": null
  }'

CI/CD Schema Compliance Testing

# GitHub Actions workflow for schema validation in CI
name: API Schema Security Check
on:
  pull_request:
    paths: ['api/**', 'openapi/**']

jobs:
  schema-security:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Validate OpenAPI Schema
        run: |
          npm install -g @stoplight/spectral-cli
          spectral lint openapi.yaml --ruleset .spectral-security.yaml

      - name: Check for Security Anti-Patterns
        run: |
          python3 scripts/schema_security_check.py openapi.yaml

      - name: Run Contract Tests
        run: |
          npm install -g dredd
          dredd openapi.yaml http://localhost:3000 --hookfiles=./test/hooks.js

Security Anti-Patterns to Detect

Anti-Pattern Risk Fix
additionalProperties: true or missing Mass assignment Set additionalProperties: false
No maxLength on strings Buffer overflow, DoS Add appropriate maxLength constraints
No pattern on string fields Injection attacks Add regex patterns to restrict input
No enum for fixed-value fields Unexpected input processing Use enum for fields with known values
format: password without TLS Credential exposure Enforce HTTPS-only server URLs
Missing error response schemas Information leakage Define all 4xx/5xx response schemas
readOnly fields in request body Data manipulation Enforce readOnly server-side

References

信息
Category 编程开发
Name implementing-api-schema-validation-security
版本 v20260331
大小 11.58KB
更新时间 2026-04-01
语言