ETL/ELT (batch data movement & transforms)

Mental map (how the pieces line up)

  • Frames/compute: where you transform data (Pandas/Polars/Arrow/DuckDB/Dask/Spark/Numpy).
  • I/O to storage: how you read/write from local or cloud (fsspec+{s3fs,gcsfs,adlfs}, smart_open, SDKs).
  • Compression/serialization: how bytes are packed on the wire/disk (gzip/zstd/lz4/brotli, JSON/MsgPack/Avro/Proto).
  • Streaming/connectors: when data arrives as events (Kafka/Pulsar clients).
  • Ingestion helpers: batteries-included extract/load scaffolding (dlt, Singer).

Frames / compute

pandas — The default DataFrame on one machine (eager).
Use it when the dataset fits in RAM and you want rich table ops.

import pandas as pd
df = pd.read_parquet("s3://my-bucket/events/date=2025-11-06/*.parquet", engine="pyarrow")
df = df.query("country == 'US'").groupby("tenant_id").size().reset_index(name="n")
df.to_parquet("s3://my-bucket/out/by_tenant.parquet", index=False)

polars — Rust engine, parallel, lazy mode, Arrow-native. Often faster/more memory-efficient than Pandas.

import polars as pl
df = (pl.scan_parquet("s3://…/events/*.parquet")              # lazy scan (no RAM blowup)
        .filter(pl.col("country") == "US")
        .group_by("tenant_id").count()
        .collect())

pyarrow — The columnar memory format + Parquet/Feather I/O. Think: “zero-copy glue” between systems.

import pyarrow.dataset as ds
dataset = ds.dataset("s3://…/events", format="parquet", partitioning="hive")
tbl = dataset.to_table(filter=ds.field("country") == "US")  # Arrow Table (not a DataFrame)

duckdb — In-process OLAP SQL that can query Parquet/CSV/S3 directly. Great for “SQL over files” and joining biggish local data.

import duckdb
con = duckdb.connect()
con.execute("""
  SELECT country, COUNT(*) 
  FROM read_parquet('s3://…/events/*.parquet') 
  GROUP BY country
""").df()

dask — Parallelize Pandas/NumPy across cores or a small cluster (lazy). Minimal code changes from Pandas.

import dask.dataframe as dd
ddf = dd.read_parquet("s3://…/events", engine="pyarrow")
out = ddf[ddf.country == "US"].groupby("tenant_id").size().compute()

pyspark — Cluster-scale DataFrame (lazy). Strong SQL/windowing at big volume.

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3://…/events")
df = df.where(F.col("country")=="US").groupBy("tenant_id").count()
df.write.mode("overwrite").parquet("s3://…/out")

numpy — Vectorized arrays. Underpins Pandas; use directly for numeric kernels.


File & object storage I/O

fsspec — Uniform filesystem API. Backends: s3fs (S3), gcsfs (GCS), adlfs (Azure). Most modern libs use this under the hood.

import fsspec, pandas as pd
with fsspec.open("s3://my-bucket/path/file.parquet") as f:
    df = pd.read_parquet(f)

smart_open — File-like streaming to/from S3/GCS/Azure/HTTP with minimal ceremony.

from smart_open import open
with open("s3://my-bucket/logs.gz", "rb") as f:
    raw = f.read()

Cloud SDKs — Use when you need admin features or precise control:

  • AWS: boto3
  • GCP: google-cloud-storage
  • Azure: azure-storage-blob

Tip: For data paths, prefer fsspec backends; for admin ops (ACLs, multipart tuning), use SDKs.


Compression / serialization

Codecs (bytes ↔ smaller bytes):

  • gzip (stdlib): ubiquitous, slower; good default for logs.
  • zstandard (zstd): excellent ratio & fast; great for big data.
  • lz4: ultrafast, okay ratio; great for streaming & real-time.
  • brotli: best ratio for text; slower; web payloads/static bundles.
import zstandard as zstd
c = zstd.ZstdCompressor(level=10)
compressed = c.compress(b"big payload")

Structured formats (objects ↔ bytes):

  • JSON: human-readable. orjson (fast, typed), ujson (older).
  • MsgPack: binary JSON; smaller/faster than JSON for many cases.
  • Avro: row-oriented with embedded schema; great for Kafka & row files. Use fastavro (fast) or avro-python3.
  • Protobuf: tiny, typed messages; great for RPC/events (needs .proto schema).

Rule of thumb:

  • Files on data lake: Parquet (via PyArrow) + Snappy/ZSTD (columnar; not in your list but crucial).
  • Event streaming: Avro/Proto with a schema registry.
  • APIs: JSON (orjson) unless you need tight latency → consider Proto.

Streaming / connectors

  • confluent-kafka — C-backed Kafka client; fastest/most reliable in Python.
  • kafka-python — Pure Python; simpler, slower—use if you must.
  • pulsar-client — Apache Pulsar client (features like multi-tenant, tiered storage).

Tiny Kafka consumer:

from confluent_kafka import Consumer
c = Consumer({"bootstrap.servers":"broker:9092", "group.id":"etl", "auto.offset.reset":"earliest"})
c.subscribe(["events"])
while True:
    msg = c.poll(1.0)
    if msg and not msg.error():
        process(msg.value())   # your transform

Operational tips: at-least-once is the default → make sinks idempotent; batch by size/time; handle backpressure; keep messages small; use schema registry for Avro/Proto.


Ingestion helpers

dlt (Data Load Tool) — Pythonic ELT scaffolding: connectors + incremental state + typing → land in DW/files with little boilerplate.

import dlt
from dlt.sources.helpers import requests

pipe = dlt.pipeline(pipeline_name="github", destination="duckdb", dataset_name="raw")
def issues(repo: str):
    for page in range(1, 4):
        for row in requests.get(f"https://api.github.com/repos/{repo}/issues?page={page}").json():
            yield row

info = pipe.run(issues("psf/requests"), table_name="issues")   # creates tables, loads rows

singer-python — Implements the Singer spec (taps produce, targets consume). Great for connector reuse.

# pseudo: emit SCHEMA, then RECORD lines to stdout; another process (target) loads to DB
from singer import write_schema, write_records
write_schema(stream_name="users", schema={"properties":{"id":{"type":["integer","null"]}}}, key_properties=["id"])
write_records("users", [{"id":1},{"id":2}])

Singer ecosystem is huge; quality varies—pick maintained taps/targets.


Choosing quickly (cheat sheet)

  • Single machine, tidy tables: start with Polars (lazy) or Pandas.
  • SQL over files: DuckDB.
  • Bigger than RAM / parallel: Dask (Pythonic) or PySpark (cluster).
  • Interchange & Parquet: PyArrow everywhere.
  • Cloud paths: fsspec + s3fs/gcsfs/adlfs; fall back to SDKs for admin.
  • Compression: default to Zstandard; use gzip only for compatibility; lz4 for blazing decompression.
  • Serialization: orjson (fast JSON), Avro/Proto for schemas & streams.
  • Events: confluent-kafka; use batches, schema registry, idempotent sinks.
  • Quick ELT: dlt; for connector catalogs: Singer.

Common gotchas (and fixes)

  • Memory blowups: read lazily (pl.scan_parquet, Arrow dataset, chunked reads); avoid list() on generators.
  • Tiny files problem: combine into row-grouped Parquet (target ~128–512 MB per file).
  • Types drift: enforce schemas (pandera/pydantic) at the boundary; set dtypes explicitly on read.
  • Timezones: store UTC; keep tz-aware datetime; standardize to ISO 8601 in JSON.
  • S3/GCS creds: prefer env/instance roles; avoid hardcoding; pass session/credentials via fsspec.
  • At-least-once: make loads idempotent (UPSERT/MERGE keys, dedupe windows).

Mini end-to-end “one file ETL” (local or S3, interchangeable)

import polars as pl, fsspec

SRC = "s3://my-bucket/events/date=2025-11-06/*.parquet"  # or "data/*.parquet"
DST = "s3://my-bucket/out/by_tenant.parquet"

# transform (lazy)
lazy = (pl.scan_parquet(SRC)
          .filter(pl.col("country") == "US")
          .group_by("tenant_id").agg(pl.len().alias("events")))

# write (single-machine fast path)
df = lazy.collect()
with fsspec.open(DST, "wb") as f:
    df.write_parquet(f)