Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
snowflake-install-auth setupCREATE STAGE and USAGE on warehouse-- CSV format
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
EMPTY_FIELD_AS_NULL = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
-- JSON format (for semi-structured data)
CREATE OR REPLACE FILE FORMAT my_json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
IGNORE_UTF8_ERRORS = TRUE;
-- Parquet format
CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = 'PARQUET'
SNAPPY_COMPRESSION = TRUE;
-- External stage (S3)
CREATE OR REPLACE STAGE my_s3_stage
STORAGE_INTEGRATION = my_s3_integration
URL = 's3://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- External stage (GCS)
CREATE OR REPLACE STAGE my_gcs_stage
STORAGE_INTEGRATION = my_gcs_integration
URL = 'gcs://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- Internal stage (Snowflake-managed storage)
CREATE OR REPLACE STAGE my_internal_stage
FILE_FORMAT = my_csv_format;
-- List files in stage
LIST @my_s3_stage;
# Using SnowSQL PUT command
snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector
cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")
-- Basic COPY INTO from stage
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format
ON_ERROR = 'CONTINUE' -- Skip bad rows
PURGE = TRUE; -- Delete files after load
-- COPY with column mapping
COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date)
FROM (
SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ
FROM @my_s3_stage/orders/
)
FILE_FORMAT = my_csv_format;
-- Load JSON into VARIANT column
COPY INTO my_db.my_schema.raw_events
FROM @my_s3_stage/events/
FILE_FORMAT = my_json_format
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Check COPY history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));
import { pool } from './snowflake/pool';
import { query } from './snowflake/query';
async function loadDataFromStage(
tableName: string,
stagePath: string,
fileFormat: string = 'my_csv_format'
) {
return pool.withConnection(async (conn) => {
const result = await query(conn, `
COPY INTO ${tableName}
FROM @${stagePath}
FILE_FORMAT = ${fileFormat}
ON_ERROR = 'CONTINUE'
FORCE = FALSE
`);
// COPY INTO returns load status per file
for (const row of result.rows) {
console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`);
if (row.errors_seen > 0) {
console.warn(` Errors: ${row.errors_seen}, First error: ${row.first_error}`);
}
}
return result.rows;
});
}
-- Create pipe for auto-ingest from S3
CREATE OR REPLACE PIPE my_db.my_schema.user_pipe
AUTO_INGEST = TRUE
AS
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format;
-- Get the SQS queue ARN for S3 event notifications
SHOW PIPES LIKE 'user_pipe';
-- Use the notification_channel value to configure S3 bucket events
-- Monitor pipe status
SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe');
-- Check Snowpipe load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
))
WHERE pipe_catalog_name IS NOT NULL;
| Error | Cause | Solution |
|---|---|---|
Insufficient privileges on stage |
Role lacks USAGE | GRANT USAGE ON STAGE x TO ROLE y |
File not found |
Wrong stage path | Run LIST @stage to verify files |
Number of columns in file does not match |
Schema mismatch | Use ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE or fix file |
Files already loaded |
COPY deduplication | Use FORCE = TRUE to reload (careful) |
Pipe not receiving files |
Missing S3 event notification | Configure SQS notification from SHOW PIPES output |
For data transformation workflows, see snowflake-core-workflow-b.