Data validation / quality gates

Big picture: what “quality gates” do

  • Validate shape & types (columns/fields, nullability, enums).
  • Check semantics (ranges, referential rules, dedup).
  • Fail fast (stop bad data) or fail soft (log, quarantine, alert).
  • Prove what you checked (artifacts/reports for auditors).

1) DataFrame-native validation — pandera

Best when your data is already in Pandas/Polars and you want declarative, column-wise rules.

Why use it

  • Typed DataFrame schemas; integrates with Pandas ops.
  • Nice error reports showing rows that violate rules.
  • Great for pre-load gates and unit tests on transforms.

Minimal recipe

import pandera as pa
from pandera.typing import Series, DataFrame

class Orders(pa.SchemaModel):
    order_id: Series[int] = pa.Field(coerce=True)     # cast to int
    amount:   Series[float] = pa.Field(ge=0)          # no negatives
    country:  Series[str] = pa.Field(isin=["US","CA"])
    ts:       Series[pa.DateTime] = pa.Field(nullable=False)

    class Config:
        strict = True  # extra columns → error

def validate_orders(df) -> DataFrame[Orders]:
    return Orders.validate(df, lazy=True)  # collect all errors at once

Good to know

  • Use lazy=True to aggregate all failures.
  • Keep schemas versioned alongside code; add a smoke test that runs validate on a small fixture.

2) General schema validation (dict/JSON level)

Use these when data is records/configs (not DataFrames) or for service payloads.

pydantic (v2)

  • Fast, typed models; great error messages; transforms + validation in one place.
from pydantic import BaseModel, Field, EmailStr, ValidationError

class Signup(BaseModel):
    user_id: int
    email: EmailStr
    plan: str = Field(pattern="^(free|pro|enterprise)$")

try:
    rec = Signup(user_id="42", email="a@b.com", plan="pro")  # type-coerces
except ValidationError as e:
    print(e.errors())

marshmallow

  • Flexible (serialize/deserialize, custom fields), very popular in APIs.
from marshmallow import Schema, fields, validates, ValidationError
class Item(Schema):
    id = fields.Int(required=True)
    price = fields.Float(required=True)
    @validates("price")
    def nonneg(self, v): 
        if v < 0: raise ValidationError("price must be >= 0")

voluptuous, cerberus, schema

  • Lightweight, declarative checks; handy for configs or quick guards.

Rule of thumb

  • Pydantic for typed apps/APIs and model-centric code.
  • Marshmallow when you care about (de)serialization pipelines.
  • Voluptuous/Cerberus/schema for tiny, dependency-light configs.

3) Expectation frameworks / profiling

These add metadata, docs, and reports—nice for teams and audits.

great_expectations

  • Suites of expectations, data docs, checkpoints; works with Pandas/Spark/SQL.
import great_expectations as ge
df = ge.from_pandas(pandas_df)
df.expect_column_values_to_not_be_null("order_id")
df.expect_column_values_to_be_between("amount", min_value=0)
res = df.validate()               # JSON result you can gate on
assert res["success"], "Quality gate failed"

When to use: org-wide standardization, nice HTML docs, storing results/artifacts.

frictionless

  • Validates tabular files (CSV, Excel) against Table Schema (JSON).
from frictionless import Resource, Schema
schema = Schema.describe("orders.csv")   # infer once → save schema.json
report = Resource("orders.csv", schema=schema).validate()
if not report.valid: 
    print(report.flatten(["rowPosition","fieldName","message"]))

When to use: file-centric pipelines, reproducible schema JSON you can commit.

ydata-profiling (ex-pandas-profiling)

  • Generates profiling reports (distributions, correlations, missingness).
    Use it to discover data issues and author expectations, not as a gate by itself.
from ydata_profiling import ProfileReport
ProfileReport(pandas_df, minimal=True).to_file("profile.html")

4) Spark-scale checks — pydeequ (wrapper over Deequ, needs JVM/Spark)

Best when the data lives in Spark and you want column constraints & metrics computed in-cluster.

from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite

check = (Check(spark, CheckLevel.Error, "orders_checks")
         .isComplete("order_id")
         .isNonNegative("amount")
         .isContainedIn("country", ["US","CA"]))

result = (VerificationSuite(spark)
          .onData(orders_spark_df)
          .addCheck(check)
          .run())

assert result.status == "Success", result.checkResults

When to use: data too big for Pandas; want to compute metrics in Spark jobs.


Choosing tool(s): quick decision grid

  • Pandas in memory?pandera (+ optionally great_expectations for docs/artifacts).
  • JSON/configs/API payloads?pydantic (or marshmallow if you need custom (de)serialization).
  • Files on disk (CSV/Excel) with schema you want to commit?frictionless (+ optional pandera after load).
  • Need auto-discovery / EDA to write rules later?ydata-profiling (then codify rules elsewhere).
  • Spark dataframes at scale?pydeequ (or GE’s Spark integration if your team standardizes on GE).

Where to place gates (battle-tested)

  1. At ingestion: validate external data before it pollutes bronze/raw.
  2. Before expensive steps: catch issues before joins/windowed transforms.
  3. Pre-publish: gate “silver → gold” (dashboards, ML features).
  4. On write: quarantine bad rows, write good rows + write a validation report.
  5. Monitor drifts: run lightweight checks daily (nulls ↑, distinct ↓).

Common pitfalls (and fixes)

  • Fail-open pipelines (log only): decide policy—fail or quarantine + alert.
  • Silent coercions (e.g., strings to numbers): be explicit (coerce=True in pandera and validate ranges).
  • Over-strict schemas on evolving data: allow nullable/optional fields, version your schemas.
  • One giant gate: split into fast “schema” checks and heavier “semantic” checks.
  • No artifacts: persist JSON/HTML reports (GE/pydeequ metrics) with run IDs for auditing.

Tiny end-to-end pattern (Pandas → gate → Snowflake)

# 1) Read & light clean
df = read_parquet("s3://.../orders_2025-11-06.parquet")

# 2) Validate (pandera)
valid = Orders.validate(df, lazy=True)

# 3) Gate: split good/bad rows
good = valid  # passes schema
bad  = df[~df.index.isin(good.index)]
bad.to_parquet("s3://.../quarantine/orders_2025-11-06.snappy")

# 4) Load good rows → DB (SQLAlchemy, fast path via COPY or connectorx)
good.to_sql("STAGING_ORDERS", engine, if_exists="append", index=False)

# 5) Record results
write_json("s3://.../quality/orders_2025-11-06.json", {
  "checked": len(df), "passed": len(good), "failed": len(bad)
})