技能 数据科学 Hex项目编排与数据流水线

Hex项目编排与数据流水线

v20260423
hex-core-workflow-a
本技能提供了将Hex项目集成到外部数据管道的核心机制。它允许用户通过API从外部工具(如Airflow、Dagster)程序化地触发、监控和管理Hex项目。用户可以实现参数化运行、执行多步骤的ETL工作流,并处理状态轮询和错误管理,适用于生产环境的数据编排。
获取技能
372 次下载
概览

Hex Project Orchestration

Overview

Trigger Hex project runs from external orchestration tools (Airflow, Dagster, cron) with input parameters, status polling, and error handling. This is the primary integration pattern for embedding Hex in data pipelines.

Instructions

Step 1: Parameterized Project Runs

import 'dotenv/config';
const TOKEN = process.env.HEX_API_TOKEN!;
const BASE = 'https://app.hex.tech/api/v1';

interface RunConfig {
  projectId: string;
  inputParams?: Record<string, any>;
  updateCache?: boolean;
  killRunning?: boolean;
}

async function triggerRun(config: RunConfig) {
  const response = await fetch(`${BASE}/project/${config.projectId}/run`, {
    method: 'POST',
    headers: { 'Authorization': `Bearer ${TOKEN}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({
      inputParams: config.inputParams || {},
      updateCacheResult: config.updateCache ?? true,
      killRunningExecution: config.killRunning ?? false,
    }),
  });
  if (!response.ok) throw new Error(`Trigger failed: ${response.status} ${await response.text()}`);
  return response.json();
}

Step 2: Synchronous Run Helper

async function runAndWait(config: RunConfig, timeoutMs = 600000): Promise<any> {
  const { runId, projectId } = await triggerRun(config);
  const startTime = Date.now();

  while (Date.now() - startTime < timeoutMs) {
    const res = await fetch(`${BASE}/project/${projectId}/run/${runId}`, {
      headers: { 'Authorization': `Bearer ${TOKEN}` },
    });
    const status = await res.json();

    switch (status.status) {
      case 'COMPLETED': return { success: true, runId, duration: Date.now() - startTime };
      case 'ERRORED': throw new Error(`Run ${runId} errored: ${status.statusMessage || 'unknown'}`);
      case 'KILLED': throw new Error(`Run ${runId} was killed`);
      default: await new Promise(r => setTimeout(r, 5000));
    }
  }
  throw new Error(`Run ${runId} timed out after ${timeoutMs}ms`);
}

Step 3: Pipeline Orchestration

// Run multiple Hex projects in sequence (data pipeline)
async function runPipeline(steps: RunConfig[]) {
  const results = [];
  for (const step of steps) {
    console.log(`Running: ${step.projectId}`);
    const result = await runAndWait(step);
    console.log(`Completed in ${result.duration}ms`);
    results.push(result);
  }
  return results;
}

// Example: ETL pipeline
await runPipeline([
  { projectId: 'extract-project-id', inputParams: { date: '2025-01-01' } },
  { projectId: 'transform-project-id' },
  { projectId: 'load-project-id', updateCache: true },
]);

Step 4: Cancel Long-Running Projects

async function cancelRun(projectId: string, runId: string) {
  const response = await fetch(`${BASE}/project/${projectId}/run/${runId}`, {
    method: 'DELETE',
    headers: { 'Authorization': `Bearer ${TOKEN}` },
  });
  console.log(`Cancelled run ${runId}: ${response.status}`);
}

Error Handling

Error Cause Solution
429 Too Many Requests Rate limit (20/min, 60/hr) Queue runs with delays
Run ERRORED Project code failed Check project logs in Hex UI
Run KILLED Timeout or manual cancel Increase timeout or fix slow queries
404 Project not published Publish project before triggering runs

Resources

Next Steps

For scheduled runs, see hex-core-workflow-b.

信息
Category 数据科学
Name hex-core-workflow-a
版本 v20260423
大小 3.98KB
更新时间 2026-04-26
语言