技能 编程开发 实现Customer.io高可用集成模式

实现Customer.io高可用集成模式

v20260423
customerio-reliability-patterns
本技能包指导如何为Customer.io集成实现容错和高可用性。涵盖了熔断器(Circuit Breaker)防止连锁故障、带抖动的重试机制(Retry with Jitter)处理瞬时错误,以及使用Redis构建回退队列(Fallback Queue),确保应用在外部服务中断时也能稳定运行。
获取技能
443 次下载
概览

Customer.io Reliability Patterns

Overview

Implement fault-tolerant Customer.io integrations: circuit breaker (stop cascading failures), retry with jitter (handle transient errors), fallback queue (survive outages), idempotency guard (prevent duplicates), and graceful degradation (never crash your app for analytics).

Prerequisites

  • Working Customer.io integration
  • Understanding of failure modes (429, 5xx, timeouts, DNS failures)
  • Redis (recommended for queue-based patterns)

Instructions

Pattern 1: Circuit Breaker

// lib/circuit-breaker.ts
type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN";

export class CircuitBreaker {
  private state: CircuitState = "CLOSED";
  private failureCount = 0;
  private successCount = 0;
  private lastFailureTime = 0;

  constructor(
    private readonly failureThreshold: number = 5,
    private readonly successThreshold: number = 3,
    private readonly resetTimeoutMs: number = 30000
  ) {}

  get currentState(): CircuitState {
    if (this.state === "OPEN") {
      // Check if enough time has passed to try again
      if (Date.now() - this.lastFailureTime > this.resetTimeoutMs) {
        this.state = "HALF_OPEN";
        this.successCount = 0;
      }
    }
    return this.state;
  }

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.currentState === "OPEN") {
      throw new Error("Circuit breaker is OPEN — Customer.io calls blocked");
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (err) {
      this.onFailure();
      throw err;
    }
  }

  private onSuccess(): void {
    this.failureCount = 0;
    if (this.state === "HALF_OPEN") {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = "CLOSED";
        console.log("Circuit breaker: CLOSED (recovered)");
      }
    }
  }

  private onFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    if (this.failureCount >= this.failureThreshold) {
      this.state = "OPEN";
      console.warn(
        `Circuit breaker: OPEN (${this.failureCount} failures). ` +
        `Will retry in ${this.resetTimeoutMs / 1000}s`
      );
    }
  }

  getStatus(): { state: CircuitState; failures: number; lastFailure: Date | null } {
    return {
      state: this.currentState,
      failures: this.failureCount,
      lastFailure: this.lastFailureTime ? new Date(this.lastFailureTime) : null,
    };
  }
}

Pattern 2: Retry with Jitter

// lib/retry.ts
export async function retryWithJitter<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  baseDelayMs = 1000
): Promise<T> {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (err: any) {
      const status = err.statusCode ?? err.status;

      // Don't retry client errors (except 429)
      if (status >= 400 && status < 500 && status !== 429) throw err;

      if (attempt === maxRetries) throw err;

      // Exponential backoff: 1s, 2s, 4s + random jitter up to 30%
      const delay = baseDelayMs * Math.pow(2, attempt);
      const jitter = delay * 0.3 * Math.random();
      await new Promise((r) => setTimeout(r, delay + jitter));
    }
  }
  throw new Error("Unreachable");
}

Pattern 3: Fallback Queue

// lib/customerio-fallback.ts
import { Queue, Worker } from "bullmq";
import { TrackClient, RegionUS } from "customerio-node";

const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379";

// Queue for operations that fail when circuit breaker is open
const fallbackQueue = new Queue("cio:fallback", {
  connection: { url: REDIS_URL },
  defaultJobOptions: {
    attempts: 10,
    backoff: { type: "exponential", delay: 60000 }, // Start at 1 min
    removeOnComplete: 1000,
    removeOnFail: 5000,
  },
});

export async function enqueueFallback(
  operation: "identify" | "track" | "suppress",
  data: Record<string, any>
): Promise<void> {
  await fallbackQueue.add(operation, data);
  console.log(`CIO fallback: queued ${operation} (circuit open)`);
}

// Worker processes fallback queue when CIO is back up
export function startFallbackWorker(): void {
  const cio = new TrackClient(
    process.env.CUSTOMERIO_SITE_ID!,
    process.env.CUSTOMERIO_TRACK_API_KEY!,
    { region: RegionUS }
  );

  new Worker("cio:fallback", async (job) => {
    switch (job.name) {
      case "identify":
        await cio.identify(job.data.userId, job.data.attrs);
        break;
      case "track":
        await cio.track(job.data.userId, job.data.event);
        break;
      case "suppress":
        await cio.suppress(job.data.userId);
        break;
    }
  }, {
    connection: { url: REDIS_URL },
    concurrency: 5,
  });
}

Pattern 4: Resilient Client (All Patterns Combined)

// lib/customerio-resilient.ts
import { TrackClient, RegionUS } from "customerio-node";
import { CircuitBreaker } from "./circuit-breaker";
import { retryWithJitter } from "./retry";
import { enqueueFallback } from "./customerio-fallback";

export class ResilientCioClient {
  private client: TrackClient;
  private breaker: CircuitBreaker;

  constructor(siteId: string, apiKey: string) {
    this.client = new TrackClient(siteId, apiKey, { region: RegionUS });
    this.breaker = new CircuitBreaker(5, 3, 30000);
  }

  async identify(userId: string, attrs: Record<string, any>): Promise<void> {
    try {
      await this.breaker.execute(() =>
        retryWithJitter(() => this.client.identify(userId, attrs))
      );
    } catch (err: any) {
      if (err.message.includes("Circuit breaker is OPEN")) {
        await enqueueFallback("identify", { userId, attrs });
        return; // Queued for later — don't throw
      }
      // For non-circuit errors, log but don't crash the app
      console.error(`CIO identify failed for ${userId}: ${err.message}`);
    }
  }

  async track(
    userId: string,
    name: string,
    data?: Record<string, any>
  ): Promise<void> {
    try {
      await this.breaker.execute(() =>
        retryWithJitter(() =>
          this.client.track(userId, { name, data })
        )
      );
    } catch (err: any) {
      if (err.message.includes("Circuit breaker is OPEN")) {
        await enqueueFallback("track", { userId, event: { name, data } });
        return;
      }
      console.error(`CIO track failed for ${userId}/${name}: ${err.message}`);
    }
  }

  getHealthStatus() {
    return this.breaker.getStatus();
  }
}

Pattern 5: Idempotency Guard

// lib/idempotency.ts
import { createHash } from "crypto";

const processedOps = new Map<string, number>();
const MAX_ENTRIES = 100_000;
const TTL_MS = 5 * 60 * 1000;  // 5 minutes

export function isIdempotent(
  operation: string,
  userId: string,
  data: any
): boolean {
  const hash = createHash("sha256")
    .update(`${operation}:${userId}:${JSON.stringify(data)}`)
    .digest("hex")
    .substring(0, 16);

  const existing = processedOps.get(hash);
  if (existing && Date.now() - existing < TTL_MS) {
    return true; // Already processed within TTL
  }

  processedOps.set(hash, Date.now());

  // Prune old entries
  if (processedOps.size > MAX_ENTRIES) {
    const cutoff = Date.now() - TTL_MS;
    for (const [key, time] of processedOps) {
      if (time < cutoff) processedOps.delete(key);
    }
  }

  return false;
}

Pattern 6: Health Check Endpoint

// routes/health.ts — include circuit breaker status
import { ResilientCioClient } from "../lib/customerio-resilient";

const cio = new ResilientCioClient(
  process.env.CUSTOMERIO_SITE_ID!,
  process.env.CUSTOMERIO_TRACK_API_KEY!
);

app.get("/health/customerio", (_req, res) => {
  const status = cio.getHealthStatus();
  const healthy = status.state === "CLOSED";

  res.status(healthy ? 200 : 503).json({
    customerio: {
      circuit_state: status.state,
      failure_count: status.failures,
      last_failure: status.lastFailure?.toISOString() ?? null,
    },
  });
});

Pattern Selection Guide

Scenario Pattern Priority
Transient 5xx errors Retry with jitter Must have
Extended Customer.io outage Circuit breaker + fallback queue Must have
Duplicate events from retries Idempotency guard Should have
App must never crash for tracking Graceful degradation (catch all) Must have
Need visibility into reliability Health check endpoint Should have

Reliability Checklist

  • Circuit breaker implemented with reasonable thresholds
  • Retry with exponential backoff and jitter
  • Fallback queue for circuit-open operations
  • Idempotency guard for retried operations
  • All Customer.io calls wrapped in try/catch (never crash the app)
  • Health check exposes circuit breaker state
  • Graceful shutdown drains queues and flushes buffers
  • Timeout configured on HTTP calls

Resources

Next Steps

After reliability patterns, proceed to customerio-load-scale for load testing and scaling.

信息
Category 编程开发
Name customerio-reliability-patterns
版本 v20260423
大小 8.09KB
更新时间 2026-04-28
语言