ETL/ELT (batch data movement & transforms)
Mental map (how the pieces line up)
- Frames/compute: where you transform data (Pandas/Polars/Arrow/DuckDB/Dask/Spark/Numpy).
- I/O to storage: how you read/write from local or cloud (fsspec+{s3fs,gcsfs,adlfs}, smart_open, SDKs).
- Compression/serialization: how bytes are packed on the wire/disk (gzip/zstd/lz4/brotli, JSON/MsgPack/Avro/Proto).
- Streaming/connectors: when data arrives as events (Kafka/Pulsar clients).
- Ingestion helpers: batteries-included extract/load scaffolding (dlt, Singer).
Frames / compute
pandas — The default DataFrame on one machine (eager).
Use it when the dataset fits in RAM and you want rich table ops.
import pandas as pd
df = pd.read_parquet("s3://my-bucket/events/date=2025-11-06/*.parquet", engine="pyarrow")
df = df.query("country == 'US'").groupby("tenant_id").size().reset_index(name="n")
df.to_parquet("s3://my-bucket/out/by_tenant.parquet", index=False)
polars — Rust engine, parallel, lazy mode, Arrow-native. Often faster/more memory-efficient than Pandas.
import polars as pl
df = (pl.scan_parquet("s3://…/events/*.parquet") # lazy scan (no RAM blowup)
.filter(pl.col("country") == "US")
.group_by("tenant_id").count()
.collect())
pyarrow — The columnar memory format + Parquet/Feather I/O. Think: “zero-copy glue” between systems.
import pyarrow.dataset as ds
dataset = ds.dataset("s3://…/events", format="parquet", partitioning="hive")
tbl = dataset.to_table(filter=ds.field("country") == "US") # Arrow Table (not a DataFrame)
duckdb — In-process OLAP SQL that can query Parquet/CSV/S3 directly. Great for “SQL over files” and joining biggish local data.
import duckdb
con = duckdb.connect()
con.execute("""
SELECT country, COUNT(*)
FROM read_parquet('s3://…/events/*.parquet')
GROUP BY country
""").df()
dask — Parallelize Pandas/NumPy across cores or a small cluster (lazy). Minimal code changes from Pandas.
import dask.dataframe as dd
ddf = dd.read_parquet("s3://…/events", engine="pyarrow")
out = ddf[ddf.country == "US"].groupby("tenant_id").size().compute()
pyspark — Cluster-scale DataFrame (lazy). Strong SQL/windowing at big volume.
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3://…/events")
df = df.where(F.col("country")=="US").groupBy("tenant_id").count()
df.write.mode("overwrite").parquet("s3://…/out")
numpy — Vectorized arrays. Underpins Pandas; use directly for numeric kernels.
File & object storage I/O
fsspec — Uniform filesystem API. Backends: s3fs (S3), gcsfs (GCS), adlfs (Azure). Most modern libs use this under the hood.
import fsspec, pandas as pd
with fsspec.open("s3://my-bucket/path/file.parquet") as f:
df = pd.read_parquet(f)
smart_open — File-like streaming to/from S3/GCS/Azure/HTTP with minimal ceremony.
from smart_open import open
with open("s3://my-bucket/logs.gz", "rb") as f:
raw = f.read()
Cloud SDKs — Use when you need admin features or precise control:
- AWS:
boto3 - GCP:
google-cloud-storage - Azure:
azure-storage-blob
Tip: For data paths, prefer fsspec backends; for admin ops (ACLs, multipart tuning), use SDKs.
Compression / serialization
Codecs (bytes ↔ smaller bytes):
gzip(stdlib): ubiquitous, slower; good default for logs.zstandard(zstd): excellent ratio & fast; great for big data.lz4: ultrafast, okay ratio; great for streaming & real-time.brotli: best ratio for text; slower; web payloads/static bundles.
import zstandard as zstd
c = zstd.ZstdCompressor(level=10)
compressed = c.compress(b"big payload")
Structured formats (objects ↔ bytes):
- JSON: human-readable.
orjson(fast, typed),ujson(older). - MsgPack: binary JSON; smaller/faster than JSON for many cases.
- Avro: row-oriented with embedded schema; great for Kafka & row files. Use fastavro (fast) or
avro-python3. - Protobuf: tiny, typed messages; great for RPC/events (needs .proto schema).
Rule of thumb:
- Files on data lake: Parquet (via PyArrow) + Snappy/ZSTD (columnar; not in your list but crucial).
- Event streaming: Avro/Proto with a schema registry.
- APIs: JSON (
orjson) unless you need tight latency → consider Proto.
Streaming / connectors
- confluent-kafka — C-backed Kafka client; fastest/most reliable in Python.
- kafka-python — Pure Python; simpler, slower—use if you must.
- pulsar-client — Apache Pulsar client (features like multi-tenant, tiered storage).
Tiny Kafka consumer:
from confluent_kafka import Consumer
c = Consumer({"bootstrap.servers":"broker:9092", "group.id":"etl", "auto.offset.reset":"earliest"})
c.subscribe(["events"])
while True:
msg = c.poll(1.0)
if msg and not msg.error():
process(msg.value()) # your transform
Operational tips: at-least-once is the default → make sinks idempotent; batch by size/time; handle backpressure; keep messages small; use schema registry for Avro/Proto.
Ingestion helpers
dlt (Data Load Tool) — Pythonic ELT scaffolding: connectors + incremental state + typing → land in DW/files with little boilerplate.
import dlt
from dlt.sources.helpers import requests
pipe = dlt.pipeline(pipeline_name="github", destination="duckdb", dataset_name="raw")
def issues(repo: str):
for page in range(1, 4):
for row in requests.get(f"https://api.github.com/repos/{repo}/issues?page={page}").json():
yield row
info = pipe.run(issues("psf/requests"), table_name="issues") # creates tables, loads rows
singer-python — Implements the Singer spec (taps produce, targets consume). Great for connector reuse.
# pseudo: emit SCHEMA, then RECORD lines to stdout; another process (target) loads to DB
from singer import write_schema, write_records
write_schema(stream_name="users", schema={"properties":{"id":{"type":["integer","null"]}}}, key_properties=["id"])
write_records("users", [{"id":1},{"id":2}])
Singer ecosystem is huge; quality varies—pick maintained taps/targets.
Choosing quickly (cheat sheet)
- Single machine, tidy tables: start with Polars (lazy) or Pandas.
- SQL over files: DuckDB.
- Bigger than RAM / parallel: Dask (Pythonic) or PySpark (cluster).
- Interchange & Parquet: PyArrow everywhere.
- Cloud paths: fsspec +
s3fs/gcsfs/adlfs; fall back to SDKs for admin. - Compression: default to Zstandard; use gzip only for compatibility; lz4 for blazing decompression.
- Serialization: orjson (fast JSON), Avro/Proto for schemas & streams.
- Events: confluent-kafka; use batches, schema registry, idempotent sinks.
- Quick ELT: dlt; for connector catalogs: Singer.
Common gotchas (and fixes)
- Memory blowups: read lazily (
pl.scan_parquet, Arrow dataset, chunked reads); avoidlist()on generators. - Tiny files problem: combine into row-grouped Parquet (target ~128–512 MB per file).
- Types drift: enforce schemas (pandera/pydantic) at the boundary; set dtypes explicitly on read.
- Timezones: store UTC; keep tz-aware
datetime; standardize to ISO 8601 in JSON. - S3/GCS creds: prefer env/instance roles; avoid hardcoding; pass session/credentials via fsspec.
- At-least-once: make loads idempotent (UPSERT/MERGE keys, dedupe windows).
Mini end-to-end “one file ETL” (local or S3, interchangeable)
import polars as pl, fsspec
SRC = "s3://my-bucket/events/date=2025-11-06/*.parquet" # or "data/*.parquet"
DST = "s3://my-bucket/out/by_tenant.parquet"
# transform (lazy)
lazy = (pl.scan_parquet(SRC)
.filter(pl.col("country") == "US")
.group_by("tenant_id").agg(pl.len().alias("events")))
# write (single-machine fast path)
df = lazy.collect()
with fsspec.open(DST, "wb") as f:
df.write_parquet(f)




