Skills Data Science ClickHouse Data Ingestion Pipelines

ClickHouse Data Ingestion Pipelines

v20260423
clickhouse-webhooks-events
A comprehensive guide and implementation template for building robust data ingestion pipelines into ClickHouse. Supports multiple sources including HTTP webhooks (with batching), Kafka streams, S3, and direct SQL inserts. Demonstrates best practices for data handling, such as deduplication using ReplacingMergeTree and monitoring query logs, ensuring reliable, scalable data analytics.
Get Skill
248 downloads
Overview

ClickHouse Data Ingestion

Overview

Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka, and streaming sources with proper batching, deduplication, and error handling.

Prerequisites

  • ClickHouse table with appropriate engine (see clickhouse-core-workflow-a)
  • @clickhouse/client connected

Instructions

Step 1: Webhook Receiver with Batched Inserts

import express from 'express';
import { createClient } from '@clickhouse/client';

const client = createClient({ url: process.env.CLICKHOUSE_HOST! });
const app = express();
app.use(express.json());

// Buffer for batching — ClickHouse hates one-row-at-a-time inserts
const buffer: Record<string, unknown>[] = [];
const BATCH_SIZE = 5_000;
const FLUSH_INTERVAL_MS = 5_000;

async function flushBuffer() {
  if (buffer.length === 0) return;
  const batch = buffer.splice(0, buffer.length);

  try {
    await client.insert({
      table: 'analytics.events',
      values: batch,
      format: 'JSONEachRow',
    });
    console.log(`Flushed ${batch.length} events to ClickHouse`);
  } catch (err) {
    console.error('Insert failed, re-queuing:', (err as Error).message);
    buffer.unshift(...batch);  // Put back at front for retry
  }
}

// Flush periodically
setInterval(flushBuffer, FLUSH_INTERVAL_MS);

// Webhook endpoint
app.post('/ingest', async (req, res) => {
  const events = Array.isArray(req.body) ? req.body : [req.body];

  for (const event of events) {
    buffer.push({
      event_type: event.type ?? 'unknown',
      user_id: event.userId ?? 0,
      properties: JSON.stringify(event.properties ?? {}),
      created_at: new Date().toISOString().replace('T', ' ').slice(0, 19),
    });
  }

  if (buffer.length >= BATCH_SIZE) {
    await flushBuffer();
  }

  res.status(202).json({ queued: events.length, buffer_size: buffer.length });
});

Step 2: Kafka Table Engine (Server-Side Ingestion)

-- Create a Kafka engine table (consumes messages automatically)
CREATE TABLE analytics.events_kafka (
    event_type  String,
    user_id     UInt64,
    properties  String,
    timestamp   DateTime
)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 2,
    kafka_max_block_size = 65536;

-- Materialized view pipes Kafka → MergeTree automatically
CREATE MATERIALIZED VIEW analytics.events_kafka_mv
TO analytics.events
AS SELECT
    event_type,
    user_id,
    properties,
    timestamp AS created_at
FROM analytics.events_kafka;

-- ClickHouse now consumes from Kafka continuously!
-- Check lag:
SELECT * FROM system.kafka_consumers;

Step 3: ClickPipes (ClickHouse Cloud Managed Ingestion)

ClickHouse Cloud offers ClickPipes — a managed ingestion service that connects to Kafka, Confluent, Amazon MSK, S3, and GCS without code.

ClickPipes Configuration (Cloud Console):
1. Source: Amazon MSK / Confluent Cloud / Apache Kafka
2. Topic: events
3. Format: JSONEachRow
4. Target: analytics.events
5. Scaling: 2 consumers (auto-scales)

Step 4: HTTP Interface Bulk Insert

# Insert from CSV file via HTTP (no client needed)
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+CSVWithNames' \
  --data-binary @events.csv

# Insert from NDJSON file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+JSONEachRow' \
  --data-binary @events.ndjson

# Insert from Parquet file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+Parquet' \
  --data-binary @events.parquet

# Insert from remote URL (ClickHouse fetches it)
INSERT INTO analytics.events
SELECT * FROM url('https://data.example.com/events.csv', CSVWithNames);

# Insert from S3
INSERT INTO analytics.events
SELECT * FROM s3(
    'https://my-bucket.s3.amazonaws.com/events/*.parquet',
    'ACCESS_KEY', 'SECRET_KEY',
    'Parquet'
);

Step 5: Deduplication with ReplacingMergeTree

-- For idempotent ingestion (webhook retries, Kafka reprocessing)
CREATE TABLE analytics.events_dedup (
    event_id    String,           -- Unique event identifier
    event_type  LowCardinality(String),
    user_id     UInt64,
    properties  String,
    created_at  DateTime,
    _version    UInt64 DEFAULT toUnixTimestamp(now())
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY event_id;               -- Dedup key

-- Insert duplicate-safe: same event_id keeps latest _version
-- Query with FINAL for deduplicated results
SELECT * FROM analytics.events_dedup FINAL
WHERE created_at >= today() - 7;

Step 6: Insert Monitoring

-- Track insert throughput
SELECT
    toStartOfMinute(event_time) AS minute,
    count() AS inserts,
    sum(written_rows) AS rows_inserted,
    formatReadableSize(sum(written_bytes)) AS bytes_inserted
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query_kind = 'Insert'
  AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;

-- Check for insert errors
SELECT event_time, exception, substring(query, 1, 200)
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing'
  AND query_kind = 'Insert'
  AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;

Insert Best Practices

Practice Why
Batch 10K-100K rows per INSERT Fewer parts, faster merges
Buffer 1-5 seconds for real-time Balances latency vs throughput
Use JSONEachRow format Client handles serialization
Compress with ZSTD on wire Reduces network transfer
Use ReplacingMergeTree for retries Handles duplicate delivery
Use async_insert=1 for small batches Server-side batching

Error Handling

Error Cause Solution
Too many parts Single-row inserts Batch inserts (10K+ rows)
Cannot parse input Wrong format Match format to data structure
TIMEOUT on large insert Slow network Enable compression, split batch
Duplicate events Webhook retries Use ReplacingMergeTree + event_id

Resources

Next Steps

For query and server performance, see clickhouse-performance-tuning.

Info
Category Data Science
Name clickhouse-webhooks-events
Version v20260423
Size 6.98KB
Updated At 2026-04-26
Language