Data validation / quality gates
Big picture: what “quality gates” do
- Validate shape & types (columns/fields, nullability, enums).
- Check semantics (ranges, referential rules, dedup).
- Fail fast (stop bad data) or fail soft (log, quarantine, alert).
- Prove what you checked (artifacts/reports for auditors).
1) DataFrame-native validation — pandera
Best when your data is already in Pandas/Polars and you want declarative, column-wise rules.
Why use it
- Typed DataFrame schemas; integrates with Pandas ops.
- Nice error reports showing rows that violate rules.
- Great for pre-load gates and unit tests on transforms.
Minimal recipe
import pandera as pa
from pandera.typing import Series, DataFrame
class Orders(pa.SchemaModel):
order_id: Series[int] = pa.Field(coerce=True) # cast to int
amount: Series[float] = pa.Field(ge=0) # no negatives
country: Series[str] = pa.Field(isin=["US","CA"])
ts: Series[pa.DateTime] = pa.Field(nullable=False)
class Config:
strict = True # extra columns → error
def validate_orders(df) -> DataFrame[Orders]:
return Orders.validate(df, lazy=True) # collect all errors at once
Good to know
- Use
lazy=Trueto aggregate all failures. - Keep schemas versioned alongside code; add a smoke test that runs
validateon a small fixture.
2) General schema validation (dict/JSON level)
Use these when data is records/configs (not DataFrames) or for service payloads.
pydantic (v2)
- Fast, typed models; great error messages; transforms + validation in one place.
from pydantic import BaseModel, Field, EmailStr, ValidationError
class Signup(BaseModel):
user_id: int
email: EmailStr
plan: str = Field(pattern="^(free|pro|enterprise)$")
try:
rec = Signup(user_id="42", email="a@b.com", plan="pro") # type-coerces
except ValidationError as e:
print(e.errors())
marshmallow
- Flexible (serialize/deserialize, custom fields), very popular in APIs.
from marshmallow import Schema, fields, validates, ValidationError
class Item(Schema):
id = fields.Int(required=True)
price = fields.Float(required=True)
@validates("price")
def nonneg(self, v):
if v < 0: raise ValidationError("price must be >= 0")
voluptuous, cerberus, schema
- Lightweight, declarative checks; handy for configs or quick guards.
Rule of thumb
- Pydantic for typed apps/APIs and model-centric code.
- Marshmallow when you care about (de)serialization pipelines.
- Voluptuous/Cerberus/schema for tiny, dependency-light configs.
3) Expectation frameworks / profiling
These add metadata, docs, and reports—nice for teams and audits.
great_expectations
- Suites of expectations, data docs, checkpoints; works with Pandas/Spark/SQL.
import great_expectations as ge
df = ge.from_pandas(pandas_df)
df.expect_column_values_to_not_be_null("order_id")
df.expect_column_values_to_be_between("amount", min_value=0)
res = df.validate() # JSON result you can gate on
assert res["success"], "Quality gate failed"
When to use: org-wide standardization, nice HTML docs, storing results/artifacts.
frictionless
- Validates tabular files (CSV, Excel) against Table Schema (JSON).
from frictionless import Resource, Schema
schema = Schema.describe("orders.csv") # infer once → save schema.json
report = Resource("orders.csv", schema=schema).validate()
if not report.valid:
print(report.flatten(["rowPosition","fieldName","message"]))
When to use: file-centric pipelines, reproducible schema JSON you can commit.
ydata-profiling (ex-pandas-profiling)
- Generates profiling reports (distributions, correlations, missingness).
Use it to discover data issues and author expectations, not as a gate by itself.
from ydata_profiling import ProfileReport
ProfileReport(pandas_df, minimal=True).to_file("profile.html")
4) Spark-scale checks — pydeequ (wrapper over Deequ, needs JVM/Spark)
Best when the data lives in Spark and you want column constraints & metrics computed in-cluster.
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
check = (Check(spark, CheckLevel.Error, "orders_checks")
.isComplete("order_id")
.isNonNegative("amount")
.isContainedIn("country", ["US","CA"]))
result = (VerificationSuite(spark)
.onData(orders_spark_df)
.addCheck(check)
.run())
assert result.status == "Success", result.checkResults
When to use: data too big for Pandas; want to compute metrics in Spark jobs.
Choosing tool(s): quick decision grid
- Pandas in memory? →
pandera(+ optionallygreat_expectationsfor docs/artifacts). - JSON/configs/API payloads? →
pydantic(ormarshmallowif you need custom (de)serialization). - Files on disk (CSV/Excel) with schema you want to commit? →
frictionless(+ optionalpanderaafter load). - Need auto-discovery / EDA to write rules later? →
ydata-profiling(then codify rules elsewhere). - Spark dataframes at scale? →
pydeequ(or GE’s Spark integration if your team standardizes on GE).
Where to place gates (battle-tested)
- At ingestion: validate external data before it pollutes bronze/raw.
- Before expensive steps: catch issues before joins/windowed transforms.
- Pre-publish: gate “silver → gold” (dashboards, ML features).
- On write: quarantine bad rows, write good rows + write a validation report.
- Monitor drifts: run lightweight checks daily (nulls ↑, distinct ↓).
Common pitfalls (and fixes)
- Fail-open pipelines (log only): decide policy—fail or quarantine + alert.
- Silent coercions (e.g., strings to numbers): be explicit (
coerce=Truein pandera and validate ranges). - Over-strict schemas on evolving data: allow nullable/optional fields, version your schemas.
- One giant gate: split into fast “schema” checks and heavier “semantic” checks.
- No artifacts: persist JSON/HTML reports (GE/pydeequ metrics) with run IDs for auditing.
Tiny end-to-end pattern (Pandas → gate → Snowflake)
# 1) Read & light clean
df = read_parquet("s3://.../orders_2025-11-06.parquet")
# 2) Validate (pandera)
valid = Orders.validate(df, lazy=True)
# 3) Gate: split good/bad rows
good = valid # passes schema
bad = df[~df.index.isin(good.index)]
bad.to_parquet("s3://.../quarantine/orders_2025-11-06.snappy")
# 4) Load good rows → DB (SQLAlchemy, fast path via COPY or connectorx)
good.to_sql("STAGING_ORDERS", engine, if_exists="append", index=False)
# 5) Record results
write_json("s3://.../quality/orders_2025-11-06.json", {
"checked": len(df), "passed": len(good), "failed": len(bad)
})




