技能 编程开发 部署Clari数据导出流水线

部署Clari数据导出流水线

v20260423
clari-deploy-integration
本技能提供将Clari数据导出管道部署到生产级云环境的方案。支持使用Airflow、AWS Lambda或Google Cloud Functions实现定时、无服务器的自动化数据同步。适用于需要定期、可靠地获取公司预测和营收数据的自动化场景。
获取技能
380 次下载
概览

Clari Deploy Integration

Overview

Deploy Clari export pipelines to production environments: Airflow DAGs, AWS Lambda, or Google Cloud Functions for scheduled, serverless execution.

Instructions

Airflow DAG

# dags/clari_export_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

def export_clari_forecast(**context):
    from clari_client import ClariClient, ClariConfig

    client = ClariClient(ClariConfig(
        api_key=Variable.get("clari_api_key"),
    ))

    period = context["params"].get("period", "2026_Q1")
    data = client.export_and_download("company_forecast", period)

    entries = data.get("entries", [])
    context["ti"].xcom_push(key="entry_count", value=len(entries))
    # Load to warehouse here

dag = DAG(
    "clari_daily_export",
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)

export_task = PythonOperator(
    task_id="export_forecast",
    python_callable=export_clari_forecast,
    dag=dag,
)

AWS Lambda

# lambda_handler.py
import json
import boto3
from clari_client import ClariClient, ClariConfig

def handler(event, context):
    ssm = boto3.client("ssm")
    api_key = ssm.get_parameter(
        Name="/clari/api-key", WithDecryption=True
    )["Parameter"]["Value"]

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download(
        event.get("forecast_name", "company_forecast"),
        event.get("period", "2026_Q1"),
    )

    return {
        "statusCode": 200,
        "body": json.dumps({"entries": len(data.get("entries", []))}),
    }

Google Cloud Function

# main.py
import functions_framework
from google.cloud import secretmanager
from clari_client import ClariClient, ClariConfig

@functions_framework.http
def clari_export(request):
    sm = secretmanager.SecretManagerServiceClient()
    secret = sm.access_secret_version(name="projects/my-proj/secrets/clari-api-key/versions/latest")
    api_key = secret.payload.data.decode()

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download("company_forecast", "2026_Q1")

    return {"entries": len(data.get("entries", []))}

Error Handling

Issue Cause Solution
Lambda timeout Export takes > 15min Use Step Functions for long jobs
Secret not found Wrong parameter path Verify SSM/Secret Manager path
Airflow task fails Rate limited Add retries with backoff

Resources

Next Steps

For webhook setup, see clari-webhooks-events.

信息
Category 编程开发
Name clari-deploy-integration
版本 v20260423
大小 3.41KB
更新时间 2026-04-27
语言