技能 编程开发 Perplexity事件驱动架构构建

Perplexity事件驱动架构构建

v20260423
perplexity-webhooks-events
本技能用于构建与Perplexity Sonar API交互的事件驱动架构。由于该API缺乏原生网络钩子,本系统通过模拟事件模式来实现高可靠性。它支持三种核心模式:流式SSE进行实时搜索、使用任务队列进行批量自动化研究,以及设置定时任务进行新闻和趋势监控,适用于构建复杂、可扩展的AI应用。
获取技能
484 次下载
概览

Perplexity Events & Async Patterns

Overview

Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.

Event Patterns

Pattern Trigger Use Case
Streaming SSE Client request Real-time search with progressive rendering
Batch queue Job submission Research automation, report generation
Scheduled search Cron job News monitoring, trend alerts, competitive intel
Citation pipeline Post-processing Source verification, link validation

Prerequisites

  • openai package installed
  • PERPLEXITY_API_KEY set
  • Queue system (BullMQ, SQS) for batch patterns
  • Cron scheduler for monitoring patterns

Instructions

Step 1: Streaming Search (Server-Sent Events)

import OpenAI from "openai";
import express from "express";

const perplexity = new OpenAI({
  apiKey: process.env.PERPLEXITY_API_KEY!,
  baseURL: "https://api.perplexity.ai",
});

const app = express();
app.use(express.json());

app.post("/api/search/stream", async (req, res) => {
  const { query, model = "sonar" } = req.body;

  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  try {
    const stream = await perplexity.chat.completions.create({
      model,
      messages: [{ role: "user", content: query }],
      stream: true,
      max_tokens: 2048,
    });

    let fullText = "";
    for await (const chunk of stream) {
      const text = chunk.choices[0]?.delta?.content || "";
      fullText += text;

      res.write(`data: ${JSON.stringify({ type: "text", content: text })}\n\n`);

      // Citations arrive in the final chunk
      const citations = (chunk as any).citations;
      if (citations) {
        res.write(`data: ${JSON.stringify({ type: "citations", urls: citations })}\n\n`);
      }
    }

    res.write(`data: ${JSON.stringify({ type: "done", totalLength: fullText.length })}\n\n`);
  } catch (err: any) {
    res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`);
  }

  res.end();
});

Step 2: Batch Research Pipeline

import { Queue, Worker } from "bullmq";

const searchQueue = new Queue("perplexity-research", {
  connection: { host: "localhost", port: 6379 },
});

async function submitResearchBatch(
  queries: string[],
  callbackUrl: string,
  model: string = "sonar-pro"
) {
  const batchId = crypto.randomUUID();

  for (const query of queries) {
    await searchQueue.add("search", { batchId, query, callbackUrl, model }, {
      attempts: 3,
      backoff: { type: "exponential", delay: 2000 },
    });
  }

  return { batchId, totalQueries: queries.length };
}

const worker = new Worker("perplexity-research", async (job) => {
  const { query, callbackUrl, batchId, model } = job.data;

  const response = await perplexity.chat.completions.create({
    model,
    messages: [{ role: "user", content: query }],
    max_tokens: 2048,
  });

  const result = {
    event: "perplexity.search.completed",
    batchId,
    query,
    answer: response.choices[0].message.content,
    citations: (response as any).citations || [],
    model: response.model,
    tokens: response.usage?.total_tokens,
  };

  // Deliver result via callback
  await fetch(callbackUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(result),
  });
}, {
  connection: { host: "localhost", port: 6379 },
  concurrency: 3,  // Stay within rate limits
  limiter: { max: 40, duration: 60000 },  // 40 RPM safety margin
});

Step 3: Scheduled News Monitor

// Run via cron: every 6 hours
async function monitorTopics(
  topics: string[],
  webhookUrl: string
) {
  for (const topic of topics) {
    const response = await perplexity.chat.completions.create({
      model: "sonar",
      messages: [{
        role: "system",
        content: "Summarize the latest developments. Be concise. Include only new information.",
      }, {
        role: "user",
        content: `Latest developments about "${topic}" in the past 24 hours`,
      }],
      search_recency_filter: "day",
      max_tokens: 500,
    } as any);

    const answer = response.choices[0].message.content || "";
    const citations = (response as any).citations || [];

    // Only notify if there are actual developments
    if (citations.length > 0 && answer.length > 100) {
      await fetch(webhookUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          event: "perplexity.monitor.update",
          topic,
          summary: answer,
          citations,
          timestamp: new Date().toISOString(),
        }),
      });
    }

    // Rate limit protection
    await new Promise((r) => setTimeout(r, 2000));
  }
}

Step 4: Client-Side SSE Consumer

// Browser client consuming the streaming endpoint
function consumeSearchStream(
  query: string,
  onText: (text: string) => void,
  onCitations: (urls: string[]) => void,
  onDone: () => void
) {
  fetch("/api/search/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ query }),
  }).then(async (response) => {
    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const lines = decoder.decode(value).split("\n");
      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const event = JSON.parse(line.slice(6));

        if (event.type === "text") onText(event.content);
        if (event.type === "citations") onCitations(event.urls);
        if (event.type === "done") onDone();
      }
    }
  });
}

Error Handling

Issue Cause Solution
Stream stalls Complex search taking too long Set per-chunk timeout (10s)
429 in batch Too many concurrent workers Reduce concurrency, add rate limiter
Empty monitor alerts Topic too niche Broaden topic or reduce recency filter
Callback fails Webhook URL down Retry with exponential backoff

Output

  • Streaming SSE endpoint for real-time search
  • Batch research pipeline with queue-based processing
  • Scheduled news monitoring with alerting
  • Client-side stream consumer

Resources

Next Steps

For deployment setup, see perplexity-deploy-integration.

信息
Category 编程开发
Name perplexity-webhooks-events
版本 v20260423
大小 7.26KB
更新时间 2026-04-28
语言