Ingesting a Billion Rows

Ingesting a Billion Rows: COPY, Staging Tables, and Idempotent Upserts

How to build reproducible ingestion pipelines that don’t flake out under load.

Introduction: the “double-import at 2 a.m.” problem

You kicked off a backfill, the loader retried, and now your fact table has duplicates. Analytics are off, finance is mad, and you’re diffing CSVs at 2 a.m. Sound familiar?

At billion-row scale, ingestion must be:

  • Fast (parallel I/O, bulk paths)
  • Reproducible (reruns don’t corrupt data)
  • Idempotent (retries don’t create dupes)
  • Observable (you can prove what loaded and why)

This guide shows a production-grade pattern using COPY, staging tables, and idempotent upserts (MERGE) — with Snowflake-flavored SQL and Python, but the ideas map to BigQuery, Redshift, Postgres, and friends.


Architecture at a Glance

Flow: Object Store → Stage → Raw/Staging → Dedup → Target

  1. Land files in object storage (S3/GCS/Azure).
  2. COPY into a raw staging table (append-only).
  3. Normalize & deduplicate into a clean staging table.
  4. MERGE into target tables with a deterministic upsert policy.
  5. Audit: track file lineage, checksums, row counts, and load windows.

Why this works

  • Bulk path: COPY uses massively parallel readers.
  • Reproducible: raw staging is immutable; you can re-derive everything.
  • Idempotent: MERGE enforces one logical row per business key.
  • Auditable: file and row-level metadata survives the journey.

COPY vs INSERT vs Streaming — When to Use What

Ingestion modeBest forProsCons
COPY (bulk files)Backfills, daily/hourly batches, billions of rowsParallel, cheap, predictableNeeds file prep + staging
INSERT (row-by-row)Small trickle loads, admin tasksSimple to codeSlow, expensive at scale
Streaming (CDC/queues)Near-real-timeLatency, continuousComplexity, exactly-once is hard

Rule of thumb: Use COPY for big volumes; add CDC/streams when latency truly matters. Even then, land micro-batches and COPY them.


Concrete Example (Snowflake)

1) Create stage and file format

CREATE STAGE mystage
  URL='s3://my-bucket/sales/'
  CREDENTIALS=(AWS_KEY_ID=$KEY AWS_SECRET_KEY=$SECRET);

CREATE OR REPLACE FILE FORMAT my_csv_fmt
  TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1
  NULL_IF=('','NULL');

2) Raw staging table (append-only)

Keep source fidelity + metadata. Use a stable business key and ingestion metadata.

CREATE OR REPLACE TABLE raw_sales (
  order_id        STRING,
  customer_id     STRING,
  order_ts        TIMESTAMP_NTZ,
  amount          NUMBER(18,2),
  src_file_name   STRING,
  src_row_num     NUMBER,
  load_ts         TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
  _sha1_row       STRING
);

3) COPY with validation and lineage

Validate first; then load. Record file names to prevent accidental re-COPY.

-- Dry run
COPY INTO raw_sales
  FROM @mystage
  FILE_FORMAT = (FORMAT_NAME = my_csv_fmt)
  VALIDATION_MODE = RETURN_ERRORS;

-- Actual load with columns aligned
COPY INTO raw_sales(order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row)
FROM (
  SELECT
    $1, $2, TO_TIMESTAMP_NTZ($3), $4,
    METADATA$FILENAME, METADATA$FILE_ROW_NUMBER,
    SHA1(TO_VARCHAR($1)||'|'||$2||'|'||$3||'|'||$4)
  FROM @mystage (FILE_FORMAT => 'my_csv_fmt')
);

Pro tip: Maintain a loaded_files control table populated from INFORMATION_SCHEMA.LOAD_HISTORY so reruns can skip already consumed files.

4) Clean & de-duplicate into a staging view/table

Keep only the latest occurrence of each business key per file window.

CREATE OR REPLACE TEMP TABLE stage_sales AS
SELECT * EXCLUDE (rn)
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY order_id
      ORDER BY order_ts DESC, load_ts DESC
    ) AS rn
  FROM raw_sales
)
WHERE rn = 1;

5) Idempotent upsert into target

Use MERGE keyed by business key and update only when data truly changed (checksum).

CREATE OR REPLACE TABLE sales AS
SELECT * FROM stage_sales WHERE 1=0;

MERGE INTO sales t
USING stage_sales s
  ON t.order_id = s.order_id
WHEN MATCHED AND t._sha1_row <> s._sha1_row THEN
  UPDATE SET
    customer_id = s.customer_id,
    order_ts    = s.order_ts,
    amount      = s.amount,
    src_file_name = s.src_file_name,
    src_row_num   = s.src_row_num,
    _sha1_row     = s._sha1_row,
    load_ts       = s.load_ts
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row, load_ts)
  VALUES (s.order_id, s.customer_id, s.order_ts, s.amount, s.src_file_name, s.src_row_num, s._sha1_row, s.load_ts);

Why this is idempotent:

  • Re-running COPY + stage_sales yields the same winning row per order_id.
  • MERGE updates only when the checksum differs, so duplicate retries are harmless.

Python Orchestration Skeleton (works with Airflow/Dagster/Prefect)

Keep tasks deterministic and easy to re-run.

from pathlib import Path
import hashlib
import snowflake.connector

def sha1_file(p: Path) -> str:
    return hashlib.sha1(p.read_bytes()).hexdigest()

def load_files(files):
    con = snowflake.connector.connect(...)
    cur = con.cursor()
    try:
        for f in files:
            # idempotency guard: skip already loaded files
            cur.execute("SELECT 1 FROM control.loaded_files WHERE file_name=%s", (f.name,))
            if cur.fetchone():
                continue

            cur.execute(f"PUT file://{f} @mystage AUTO_COMPRESS=TRUE OVERWRITE=FALSE")
            cur.execute("""
                COPY INTO raw_sales(order_id, customer_id, order_ts, amount, src_file_name, src_row_num, _sha1_row)
                FROM (SELECT $1,$2,TO_TIMESTAMP_NTZ($3),$4,METADATA$FILENAME,METADATA$FILE_ROW_NUMBER,
                             SHA1(TO_VARCHAR($1)||'|'||$2||'|'||$3||'|'||$4))
                FILE_FORMAT=(FORMAT_NAME='my_csv_fmt')
            """)
            cur.execute("INSERT INTO control.loaded_files(file_name, file_sha1) VALUES(%s,%s)",
                        (f.name, sha1_file(f)))
        con.commit()
    finally:
        cur.close(); con.close()

Best Practices (that actually save you at scale)

File strategy

  • Target 100–250 MB compressed file size for COPY sweet spot.
  • Avoid millions of tiny files; consolidate before load.
  • Use partitioned prefixes: s3://bucket/sales/dt=2025-11-20/part-0001.gz.

Schema & keys

  • Define a stable business key (order_id) early.
  • Add a checksum column to detect meaningful change.
  • Track source lineage: filename, row number, produced_at, producer_version.

Idempotency

  • Maintain a control table for loaded_files(file_name, sha1, first_seen_ts).
  • Build pure transformations from raw → stage so reruns are deterministic.
  • Use MERGE with a change predicate (t._sha1_row <> s._sha1_row).

Reproducibility

  • Version your file format, SQL, and code.
  • Pin library versions; store manifest of files loaded for each run.
  • Treat raw staging as append-only; never mutate it.

Performance

  • Increase warehouse/slot size during COPY+MERGE windows, then scale down.
  • Use clustering/partitioning on target by query predicates (e.g., order_ts).
  • Prefer bulk MERGE over per-row upserts.

Observability

  • Capture row counts: staged in, deduped, merged-inserted, merged-updated, rejected.
  • Sample records for schema drift and null explosions.
  • Alert on COPY errors, merge anomalies, and unexpected volume shifts.

Safety & recovery

  • Use VALIDATION_MODE before big loads.
  • Keep a quarantine table for rejects; never DROP raw until checks pass.
  • Make reruns explicit by window (e.g., dt BETWEEN '2025-11-01' AND '2025-11-07').

Common Pitfalls (and blunt fixes)

  • Tiny files hell → compact upstream or in a pre-job.
  • “UPSERTs are slow” → checksum compare + bulk MERGE, not per-row loops.
  • Duplicate keys post-merge → add a unique constraint/quality check on order_id.
  • Retry storms → make tasks idempotent and retry-safe; never append without guards.
  • Schema drift surprises → ingest to VARIANT or raw strings, cast in stage, not in target.
  • Forgot lineage → add src_file_name, src_row_num, producer_version everywhere.

Conclusion & Takeaways

If you can rerun yesterday without fear, you’ve built a real pipeline.

Key moves to remember:

  • Land files → COPY into append-only raw.
  • Normalize/dedup → stage with a deterministic winner per key.
  • Load targets via MERGE with a change checksum.
  • Track file lineage and row counts to prove correctness.
  • Make every step idempotent and observable.

What to do next

  • Wrap this into an Airflow/Dagster job with clear run windows and control tables.
  • Add CDC later if the business truly needs low latency.
  • Document the contract: “One logical row per order_id, last-write wins by order_ts.”

Internal Link Ideas

  • Designing Business Keys and Checksums for Idempotent Pipelines
  • Handling Late Arrivals and Deletes (tombstones + MERGE)
  • Partitioning & Clustering Strategies for Billion-Row Tables
  • Observability for Data Loads: Row Counts, Drift, and SLAs
  • From Batch to CDC: Safely Introducing Streams

Image Prompt

“A clean, modern data architecture diagram showing an object store feeding a data warehouse via COPY into a raw table, then deduped staging, then an idempotent MERGE into a target table. Include lineage metadata and checksums. Minimalistic, high-contrast, 3D isometric style.”


Tags

#Ingestion #DataEngineering #Snowflake #COPY #Upsert #Idempotent #ETL #DataQuality #Scalability #Architecture

Leave a Reply

Your email address will not be published. Required fields are marked *