Ingesting a Billion Rows: COPY, Staging Tables, and Idempotent Upserts
How to build reproducible ingestion pipelines that don’t flake out under load.
Introduction: the “double-import at 2 a.m.” problem
You kicked off a backfill, the loader retried, and now your fact table has duplicates. Analytics are off, finance is mad, and you’re diffing CSVs at 2 a.m. Sound familiar?
At billion-row scale, ingestion must be:
- Fast (parallel I/O, bulk paths)
- Reproducible (reruns don’t corrupt data)
- Idempotent (retries don’t create dupes)
- Observable (you can prove what loaded and why)
This guide shows a production-grade pattern using COPY, staging tables, and idempotent upserts (MERGE) — with Snowflake-flavored SQL and Python, but the ideas map to BigQuery, Redshift, Postgres, and friends.
Architecture at a Glance
Flow: Object Store → Stage → Raw/Staging → Dedup → Target
- Land files in object storage (S3/GCS/Azure).
- COPY into a raw staging table (append-only).
- Normalize & deduplicate into a clean staging table.
- MERGE into target tables with a deterministic upsert policy.
- Audit: track file lineage, checksums, row counts, and load windows.
Why this works
- Bulk path: COPY uses massively parallel readers.
- Reproducible: raw staging is immutable; you can re-derive everything.
- Idempotent: MERGE enforces one logical row per business key.
- Auditable: file and row-level metadata survives the journey.
COPY vs INSERT vs Streaming — When to Use What
| Ingestion mode | Best for | Pros | Cons |
|---|---|---|---|
| COPY (bulk files) | Backfills, daily/hourly batches, billions of rows | Parallel, cheap, predictable | Needs file prep + staging |
| INSERT (row-by-row) | Small trickle loads, admin tasks | Simple to code | Slow, expensive at scale |
| Streaming (CDC/queues) | Near-real-time | Latency, continuous | Complexity, exactly-once is hard |
Rule of thumb: Use COPY for big volumes; add CDC/streams when latency truly matters. Even then, land micro-batches and COPY them.
Concrete Example (Snowflake)
1) Create stage and file format
CREATE STAGE mystage
URL='s3://my-bucket/sales/'
CREDENTIALS=(AWS_KEY_ID=$KEY AWS_SECRET_KEY=$SECRET);
CREATE OR REPLACE FILE FORMAT my_csv_fmt
TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1
NULL_IF=('','NULL');
2) Raw staging table (append-only)
Keep source fidelity + metadata. Use a stable business key and ingestion metadata.
CREATE OR REPLACE TABLE raw_sales (
order_id STRING,
customer_id STRING,
order_ts TIMESTAMP_NTZ,
amount NUMBER(18,2),
src_file_name STRING,
src_row_num NUMBER,
load_ts TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
_sha1_row STRING
);
3) COPY with validation and lineage
Validate first; then load. Record file names to prevent accidental re-COPY.
-- Dry run
COPY INTO raw_sales
FROM @mystage
FILE_FORMAT = (FORMAT_NAME = my_csv_fmt)
VALIDATION_MODE = RETURN_ERRORS;
-- Actual load with columns aligned
COPY INTO raw_sales(order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row)
FROM (
SELECT
$1, $2, TO_TIMESTAMP_NTZ($3), $4,
METADATA$FILENAME, METADATA$FILE_ROW_NUMBER,
SHA1(TO_VARCHAR($1)||'|'||$2||'|'||$3||'|'||$4)
FROM @mystage (FILE_FORMAT => 'my_csv_fmt')
);
Pro tip: Maintain a loaded_files control table populated from INFORMATION_SCHEMA.LOAD_HISTORY so reruns can skip already consumed files.
4) Clean & de-duplicate into a staging view/table
Keep only the latest occurrence of each business key per file window.
CREATE OR REPLACE TEMP TABLE stage_sales AS
SELECT * EXCLUDE (rn)
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY order_ts DESC, load_ts DESC
) AS rn
FROM raw_sales
)
WHERE rn = 1;
5) Idempotent upsert into target
Use MERGE keyed by business key and update only when data truly changed (checksum).
CREATE OR REPLACE TABLE sales AS
SELECT * FROM stage_sales WHERE 1=0;
MERGE INTO sales t
USING stage_sales s
ON t.order_id = s.order_id
WHEN MATCHED AND t._sha1_row <> s._sha1_row THEN
UPDATE SET
customer_id = s.customer_id,
order_ts = s.order_ts,
amount = s.amount,
src_file_name = s.src_file_name,
src_row_num = s.src_row_num,
_sha1_row = s._sha1_row,
load_ts = s.load_ts
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row, load_ts)
VALUES (s.order_id, s.customer_id, s.order_ts, s.amount, s.src_file_name, s.src_row_num, s._sha1_row, s.load_ts);
Why this is idempotent:
- Re-running
COPY+stage_salesyields the same winning row perorder_id. MERGEupdates only when the checksum differs, so duplicate retries are harmless.
Python Orchestration Skeleton (works with Airflow/Dagster/Prefect)
Keep tasks deterministic and easy to re-run.
from pathlib import Path
import hashlib
import snowflake.connector
def sha1_file(p: Path) -> str:
return hashlib.sha1(p.read_bytes()).hexdigest()
def load_files(files):
con = snowflake.connector.connect(...)
cur = con.cursor()
try:
for f in files:
# idempotency guard: skip already loaded files
cur.execute("SELECT 1 FROM control.loaded_files WHERE file_name=%s", (f.name,))
if cur.fetchone():
continue
cur.execute(f"PUT file://{f} @mystage AUTO_COMPRESS=TRUE OVERWRITE=FALSE")
cur.execute("""
COPY INTO raw_sales(order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row)
FROM (SELECT $1,$2,TO_TIMESTAMP_NTZ($3),$4,METADATA$FILENAME,METADATA$FILE_ROW_NUMBER,
SHA1(TO_VARCHAR($1)||'|'||$2||'|'||$3||'|'||$4))
FILE_FORMAT=(FORMAT_NAME='my_csv_fmt')
""")
cur.execute("INSERT INTO control.loaded_files(file_name, file_sha1) VALUES(%s,%s)",
(f.name, sha1_file(f)))
con.commit()
finally:
cur.close(); con.close()
Best Practices (that actually save you at scale)
File strategy
- Target 100–250 MB compressed file size for COPY sweet spot.
- Avoid millions of tiny files; consolidate before load.
- Use partitioned prefixes:
s3://bucket/sales/dt=2025-11-20/part-0001.gz.
Schema & keys
- Define a stable business key (
order_id) early. - Add a checksum column to detect meaningful change.
- Track source lineage: filename, row number, produced_at, producer_version.
Idempotency
- Maintain a control table for
loaded_files(file_name, sha1, first_seen_ts). - Build pure transformations from raw → stage so reruns are deterministic.
- Use MERGE with a change predicate (
t._sha1_row <> s._sha1_row).
Reproducibility
- Version your file format, SQL, and code.
- Pin library versions; store manifest of files loaded for each run.
- Treat raw staging as append-only; never mutate it.
Performance
- Increase warehouse/slot size during COPY+MERGE windows, then scale down.
- Use clustering/partitioning on target by query predicates (e.g.,
order_ts). - Prefer bulk MERGE over per-row upserts.
Observability
- Capture row counts: staged in, deduped, merged-inserted, merged-updated, rejected.
- Sample records for schema drift and null explosions.
- Alert on COPY errors, merge anomalies, and unexpected volume shifts.
Safety & recovery
- Use
VALIDATION_MODEbefore big loads. - Keep a quarantine table for rejects; never DROP raw until checks pass.
- Make reruns explicit by window (e.g.,
dt BETWEEN '2025-11-01' AND '2025-11-07').
Common Pitfalls (and blunt fixes)
- Tiny files hell → compact upstream or in a pre-job.
- “UPSERTs are slow” → checksum compare + bulk MERGE, not per-row loops.
- Duplicate keys post-merge → add a unique constraint/quality check on
order_id. - Retry storms → make tasks idempotent and retry-safe; never append without guards.
- Schema drift surprises → ingest to VARIANT or raw strings, cast in stage, not in target.
- Forgot lineage → add
src_file_name,src_row_num,producer_versioneverywhere.
Conclusion & Takeaways
If you can rerun yesterday without fear, you’ve built a real pipeline.
Key moves to remember:
- Land files → COPY into append-only raw.
- Normalize/dedup → stage with a deterministic winner per key.
- Load targets via MERGE with a change checksum.
- Track file lineage and row counts to prove correctness.
- Make every step idempotent and observable.
What to do next
- Wrap this into an Airflow/Dagster job with clear run windows and control tables.
- Add CDC later if the business truly needs low latency.
- Document the contract: “One logical row per
order_id, last-write wins byorder_ts.”
Internal Link Ideas
- Designing Business Keys and Checksums for Idempotent Pipelines
- Handling Late Arrivals and Deletes (tombstones + MERGE)
- Partitioning & Clustering Strategies for Billion-Row Tables
- Observability for Data Loads: Row Counts, Drift, and SLAs
- From Batch to CDC: Safely Introducing Streams
Image Prompt
“A clean, modern data architecture diagram showing an object store feeding a data warehouse via COPY into a raw table, then deduped staging, then an idempotent MERGE into a target table. Include lineage metadata and checksums. Minimalistic, high-contrast, 3D isometric style.”
Tags
#Ingestion #DataEngineering #Snowflake #COPY #Upsert #Idempotent #ETL #DataQuality #Scalability #Architecture








Leave a Reply