Skills Development Monitoring Kling AI Video Generation Jobs

Monitoring Kling AI Video Generation Jobs

v20260423
klingai-job-monitoring
This skill provides comprehensive monitoring and tracking for asynchronous video generation tasks using the Kling AI API. It supports polling single tasks, managing large batches of jobs, and detecting stalled or failed processes across multiple video endpoints. Ideal for developers building robust workflow dashboards, implementing reliable batch processing, or debugging long-running media generation pipelines.
Get Skill
211 downloads
Overview

Kling AI Job Monitoring

Overview

Every Kling AI generation returns a task_id. This skill covers polling strategies, batch tracking, timeout handling, and callback-based monitoring for the /v1/videos/text2video, /v1/videos/image2video, and /v1/videos/video-extend endpoints.

Task Lifecycle

Status Meaning Typical Duration
submitted Queued for processing 0-30s
processing Generation in progress 30-120s (standard), 60-300s (professional)
succeed Complete, video URL available Terminal
failed Generation failed Terminal

Polling a Single Task

import jwt, time, os, requests

BASE = "https://api.klingai.com/v1"

def get_headers():
    ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
    token = jwt.encode(
        {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
        sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
    )
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

def poll_task(endpoint: str, task_id: str, interval: int = 10, timeout: int = 600):
    """Poll with adaptive interval and timeout."""
    start = time.monotonic()
    attempts = 0
    while time.monotonic() - start < timeout:
        time.sleep(interval)
        attempts += 1
        r = requests.get(f"{BASE}{endpoint}/{task_id}", headers=get_headers(), timeout=30)
        data = r.json()["data"]
        status = data["task_status"]
        elapsed = int(time.monotonic() - start)
        print(f"[{elapsed}s] Poll #{attempts}: {status}")

        if status == "succeed":
            return data["task_result"]
        elif status == "failed":
            raise RuntimeError(f"Task failed: {data.get('task_status_msg', 'unknown')}")

        if attempts > 5:
            interval = min(interval * 1.2, 30)
    raise TimeoutError(f"Task {task_id} timed out after {timeout}s")

Batch Job Tracker

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class TrackedTask:
    task_id: str
    endpoint: str
    prompt: str
    status: str = "submitted"
    created_at: float = field(default_factory=time.time)
    result_url: Optional[str] = None
    error_msg: Optional[str] = None

class BatchTracker:
    def __init__(self):
        self.tasks: dict[str, TrackedTask] = {}

    def add(self, task_id, endpoint, prompt):
        self.tasks[task_id] = TrackedTask(task_id=task_id, endpoint=endpoint, prompt=prompt)

    def update_all(self):
        active = [t for t in self.tasks.values() if t.status in ("submitted", "processing")]
        for task in active:
            try:
                r = requests.get(
                    f"{BASE}{task.endpoint}/{task.task_id}",
                    headers=get_headers(), timeout=30
                ).json()
                data = r["data"]
                task.status = data["task_status"]
                if task.status == "succeed":
                    task.result_url = data["task_result"]["videos"][0]["url"]
                elif task.status == "failed":
                    task.error_msg = data.get("task_status_msg")
            except Exception as e:
                print(f"Error polling {task.task_id}: {e}")

    def print_report(self):
        by_status = {}
        for t in self.tasks.values():
            by_status.setdefault(t.status, 0)
            by_status[t.status] += 1
        active = sum(v for k, v in by_status.items() if k in ("submitted", "processing"))
        print(f"\n=== Batch: {len(self.tasks)} tasks, {active} active ===")
        for status, count in sorted(by_status.items()):
            print(f"  {status}: {count}")

Stuck Task Detection

def detect_stuck(tracker: BatchTracker, threshold_sec: int = 600):
    """Flag tasks processing longer than threshold."""
    now = time.time()
    stuck = []
    for t in tracker.tasks.values():
        if t.status in ("submitted", "processing"):
            elapsed = now - t.created_at
            if elapsed > threshold_sec:
                stuck.append((t.task_id, int(elapsed)))
    if stuck:
        print(f"WARNING: {len(stuck)} stuck tasks:")
        for tid, secs in stuck:
            print(f"  {tid}: {secs}s")
    return stuck

Batch Monitor Loop

tracker = BatchTracker()

# Submit batch
for prompt in prompts:
    r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
        "model_name": "kling-v2-master", "prompt": prompt, "duration": "5"
    }).json()
    tracker.add(r["data"]["task_id"], "/videos/text2video", prompt)

# Monitor until all complete
while any(t.status in ("submitted", "processing") for t in tracker.tasks.values()):
    time.sleep(15)
    tracker.update_all()
    tracker.print_report()
    detect_stuck(tracker)

Resources

Info
Category Development
Name klingai-job-monitoring
Version v20260423
Size 6.93KB
Updated At 2026-04-28
Language