Skills Data Science Snowflake Data Loading and ETL Workflow

Snowflake Data Loading and ETL Workflow

v20260423
snowflake-core-workflow-a
A comprehensive guide for executing core Snowflake data loading workflows. This covers setting up stages (S3, GCS, Azure), defining file formats (CSV, JSON, Parquet), and using COPY INTO for bulk loading. It also details advanced continuous ingestion using Snowpipe, enabling real-time data synchronization from cloud storage into Snowflake tables.
Get Skill
302 downloads
Overview

Snowflake Core Workflow A — Data Loading

Overview

Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.

Prerequisites

  • Completed snowflake-install-auth setup
  • Target table created in Snowflake
  • Source data in S3, GCS, Azure Blob, or local files
  • Role with CREATE STAGE and USAGE on warehouse

Instructions

Step 1: Create a File Format

-- CSV format
CREATE OR REPLACE FILE FORMAT my_csv_format
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  SKIP_HEADER = 1
  NULL_IF = ('NULL', 'null', '')
  EMPTY_FIELD_AS_NULL = TRUE
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;

-- JSON format (for semi-structured data)
CREATE OR REPLACE FILE FORMAT my_json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE
  IGNORE_UTF8_ERRORS = TRUE;

-- Parquet format
CREATE OR REPLACE FILE FORMAT my_parquet_format
  TYPE = 'PARQUET'
  SNAPPY_COMPRESSION = TRUE;

Step 2: Create a Stage

-- External stage (S3)
CREATE OR REPLACE STAGE my_s3_stage
  STORAGE_INTEGRATION = my_s3_integration
  URL = 's3://my-bucket/data/'
  FILE_FORMAT = my_csv_format;

-- External stage (GCS)
CREATE OR REPLACE STAGE my_gcs_stage
  STORAGE_INTEGRATION = my_gcs_integration
  URL = 'gcs://my-bucket/data/'
  FILE_FORMAT = my_csv_format;

-- Internal stage (Snowflake-managed storage)
CREATE OR REPLACE STAGE my_internal_stage
  FILE_FORMAT = my_csv_format;

-- List files in stage
LIST @my_s3_stage;

Step 3: Upload Files to Internal Stage

# Using SnowSQL PUT command
snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector
cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")

Step 4: Load Data with COPY INTO

-- Basic COPY INTO from stage
COPY INTO my_db.my_schema.users
  FROM @my_s3_stage/users/
  FILE_FORMAT = my_csv_format
  ON_ERROR = 'CONTINUE'          -- Skip bad rows
  PURGE = TRUE;                  -- Delete files after load

-- COPY with column mapping
COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date)
  FROM (
    SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ
    FROM @my_s3_stage/orders/
  )
  FILE_FORMAT = my_csv_format;

-- Load JSON into VARIANT column
COPY INTO my_db.my_schema.raw_events
  FROM @my_s3_stage/events/
  FILE_FORMAT = my_json_format
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Check COPY history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 'USERS',
  START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));

Step 5: Programmatic Load (Node.js)

import { pool } from './snowflake/pool';
import { query } from './snowflake/query';

async function loadDataFromStage(
  tableName: string,
  stagePath: string,
  fileFormat: string = 'my_csv_format'
) {
  return pool.withConnection(async (conn) => {
    const result = await query(conn, `
      COPY INTO ${tableName}
        FROM @${stagePath}
        FILE_FORMAT = ${fileFormat}
        ON_ERROR = 'CONTINUE'
        FORCE = FALSE
    `);

    // COPY INTO returns load status per file
    for (const row of result.rows) {
      console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`);
      if (row.errors_seen > 0) {
        console.warn(`  Errors: ${row.errors_seen}, First error: ${row.first_error}`);
      }
    }
    return result.rows;
  });
}

Step 6: Set Up Snowpipe for Continuous Loading

-- Create pipe for auto-ingest from S3
CREATE OR REPLACE PIPE my_db.my_schema.user_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO my_db.my_schema.users
    FROM @my_s3_stage/users/
    FILE_FORMAT = my_csv_format;

-- Get the SQS queue ARN for S3 event notifications
SHOW PIPES LIKE 'user_pipe';
-- Use the notification_channel value to configure S3 bucket events

-- Monitor pipe status
SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe');

-- Check Snowpipe load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 'USERS',
  START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
))
WHERE pipe_catalog_name IS NOT NULL;

Error Handling

Error Cause Solution
Insufficient privileges on stage Role lacks USAGE GRANT USAGE ON STAGE x TO ROLE y
File not found Wrong stage path Run LIST @stage to verify files
Number of columns in file does not match Schema mismatch Use ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE or fix file
Files already loaded COPY deduplication Use FORCE = TRUE to reload (careful)
Pipe not receiving files Missing S3 event notification Configure SQS notification from SHOW PIPES output

Resources

Next Steps

For data transformation workflows, see snowflake-core-workflow-b.

Info
Category Data Science
Name snowflake-core-workflow-a
Version v20260423
Size 5.61KB
Updated At 2026-04-28
Language