It was 2 AM on a Tuesday when I got the PagerDuty alert that every data engineer dreads: our revenue dashboard was showing zeros across the board. The CFO had already emailed the CTO. The CTO had already pinged my manager. My manager had already pinged me. By the time I opened my laptop, the incident channel had fourteen people in it and the phrase "data team broke something again" was already being thrown around.
The root cause? An upstream microservice team had renamed a field from transaction_amount to txn_amount in their event payload. No announcement. No migration period. No versioning. Just a Friday afternoon deploy that nuked three downstream pipelines, two ML feature stores, and the executive reporting layer. The total blast radius took four days to fully remediate.
That incident is what finally pushed me to take data contracts seriously. Not as a theoretical concept from a conference talk, but as actual infrastructure that prevents one team's "minor refactor" from becoming another team's all-hands-on-deck incident.
What you will get from this article: A practical guide to implementing data contracts in real production environments, with code examples in Pydantic, Great Expectations, and Protobuf. I will cover what worked for us, what did not, and how to get organizational buy-in without becoming the "process police" nobody wants to talk to.
What Are Data Contracts, Exactly?
A data contract is a formal agreement between a data producer and its consumers that defines the structure, semantics, quality guarantees, and SLAs of a data interface. Think of it as an API contract, but for data pipelines instead of REST endpoints.
The web development world figured this out years ago. Nobody ships a REST API without documenting the request and response schemas. Nobody changes a public API field name without a deprecation period. But in the data world, we have been living in the Wild West where a Kafka topic schema is whatever the last developer decided to push.
A complete data contract typically specifies:
- Schema definition: Field names, data types, nullability constraints, and nested structure
- Semantic meaning: What each field actually represents (is
amountin cents or dollars? IstimestampUTC or local time?) - Quality expectations: Acceptable null rates, value ranges, uniqueness constraints, freshness guarantees
- SLAs: When data will arrive, how often it updates, maximum acceptable latency
- Ownership: Who produces this data, who to contact when something breaks, escalation paths
- Versioning and evolution rules: How the schema can change without breaking consumers
Without these written down and enforced, every pipeline is one upstream change away from the 2 AM incident I described. And trust me, the transaction_amount rename was not even the worst one. The worst one was when a team switched a boolean field from true/false to "Y"/"N" strings and our Spark jobs silently cast them all to null for three weeks before anyone noticed.
Why Data Contracts Matter More Than Ever
Ten years ago, most companies had a single data warehouse team that owned everything from ingestion to reporting. If a schema changed, the same team knew about it because they made the change. The data contract was implicit: it lived in people's heads and in tribal knowledge.
That world is gone. Modern data stacks have dozens of producers (microservices, SaaS tools, third-party APIs, IoT devices) feeding into platforms consumed by analytics engineers, ML engineers, product analysts, and business intelligence teams. The implicit contract does not scale when the producer and consumer have never met each other.
Here is what I have seen go wrong without explicit data contracts:
- Silent schema drift. A field type changes from integer to string. Downstream jobs do not fail, they just produce garbage. You find out weeks later when a quarterly report looks off.
- Semantic ambiguity. Two teams interpret the same field differently. One thinks
created_atis when the database row was inserted, the other thinks it is when the user performed the action. Both build correct-looking dashboards that disagree. - Undocumented deprecation. A field stops being populated but is not removed from the schema. Consumers keep querying it and get nulls or stale data.
- Cascading failures. One broken contract propagates through a DAG of twenty downstream tables, and the blast radius keeps growing because nobody mapped the dependencies.
- Finger-pointing culture. Without contracts, every incident devolves into "the data team should have validated" versus "the app team should have announced the change." Nobody wins.
Implementation Approach 1: Pydantic Models as Contracts
The simplest way to start with data contracts is to define your expected schemas as code. If you are in the Python ecosystem, Pydantic is the obvious choice. It gives you runtime validation, automatic documentation, and serialization for free.
Here is how we define a contract for an event coming off a Kafka topic:
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from enum import Enum
from typing import Optional
from uuid import UUID
class CurrencyCode(str, Enum):
USD = "USD"
EUR = "EUR"
GBP = "GBP"
class TransactionEvent(BaseModel):
"""
Data contract for the payments.transactions.v2 Kafka topic.
Owner: Payments Team (#payments-eng in Slack)
SLA: Events arrive within 30 seconds of transaction completion.
Update frequency: Real-time streaming.
Last reviewed: 2025-11-15
"""
event_id: UUID = Field(description="Unique identifier for this event, used for deduplication")
transaction_id: str = Field(min_length=10, max_length=64, description="External transaction reference ID")
user_id: int = Field(gt=0, description="Internal user ID, always positive integer")
amount_cents: int = Field(ge=0, description="Transaction amount in cents (not dollars)")
currency: CurrencyCode = Field(description="ISO 4217 currency code")
status: str = Field(pattern=r"^(completed|refunded|failed|pending)$")
merchant_name: str = Field(min_length=1, max_length=255)
merchant_category_code: Optional[str] = Field(default=None, pattern=r"^\d{4}$")
occurred_at: datetime = Field(description="UTC timestamp when the transaction occurred")
processed_at: datetime = Field(description="UTC timestamp when our system processed the event")
@field_validator("occurred_at", "processed_at")
@classmethod
def must_be_utc(cls, v: datetime) -> datetime:
if v.tzinfo is None:
raise ValueError("Timestamps must be timezone-aware (UTC)")
return v
@field_validator("processed_at")
@classmethod
def processed_after_occurred(cls, v: datetime, info) -> datetime:
if "occurred_at" in info.data and v < info.data["occurred_at"]:
raise ValueError("processed_at cannot be before occurred_at")
return v
This is not just a schema. The docstring documents ownership and SLAs. The validators encode business rules that go beyond type checking. The Field descriptions remove semantic ambiguity (amount is in cents, not dollars). And because it is Python code, it lives in version control and goes through code review.
We validate every incoming event against the contract in our Kafka consumer:
from pydantic import ValidationError
import json
import structlog
logger = structlog.get_logger()
def process_message(raw_message: bytes) -> Optional[TransactionEvent]:
try:
payload = json.loads(raw_message)
event = TransactionEvent(**payload)
return event
except ValidationError as e:
logger.error(
"contract_violation",
topic="payments.transactions.v2",
errors=e.errors(),
raw_payload=payload,
)
# Send to dead letter queue for investigation
publish_to_dlq("payments.transactions.v2.dlq", raw_message, str(e))
# Emit metric for alerting
CONTRACT_VIOLATIONS.labels(
topic="payments.transactions.v2",
error_type=e.errors()[0]["type"]
).inc()
return None
The key insight here is that contract violations should be loud but not blocking. We log the error, send the message to a dead letter queue, increment a Prometheus counter, and move on. If the violation rate crosses a threshold (we use 1% over a 5-minute window), PagerDuty fires. This way, a single malformed event does not stop the pipeline, but a systematic contract break gets caught fast.
Implementation Approach 2: Schema Registry with Protobuf
Pydantic contracts work great within a single team's codebase, but they do not solve the cross-team problem. If the payments team and the data team are in separate repos with separate deploy cycles, you need a shared source of truth. That is where a schema registry comes in.
We use Protobuf with Confluent Schema Registry for our Kafka topics. Here is the same transaction event as a .proto file:
// proto/payments/v2/transaction_event.proto
syntax = "proto3";
package payments.v2;
import "google/protobuf/timestamp.proto";
// Data contract for the payments.transactions.v2 Kafka topic.
// Owner: Payments Team (#payments-eng)
// SLA: Events within 30s of transaction completion.
message TransactionEvent {
string event_id = 1; // UUID, unique per event
string transaction_id = 2; // External reference, 10-64 chars
int64 user_id = 3; // Internal user ID, always > 0
int64 amount_cents = 4; // Amount in cents, >= 0
CurrencyCode currency = 5;
TransactionStatus status = 6;
string merchant_name = 7;
string merchant_category_code = 8; // MCC, 4 digits, optional
google.protobuf.Timestamp occurred_at = 9; // UTC
google.protobuf.Timestamp processed_at = 10; // UTC
}
enum CurrencyCode {
CURRENCY_UNSPECIFIED = 0;
USD = 1;
EUR = 2;
GBP = 3;
}
enum TransactionStatus {
STATUS_UNSPECIFIED = 0;
COMPLETED = 1;
REFUNDED = 2;
FAILED = 3;
PENDING = 4;
}
The schema registry gives you something Pydantic alone cannot: compatibility checking. When a producer tries to register a new schema version, the registry can enforce compatibility rules:
- BACKWARD: New schema can read data written by the old schema (safe for consumers to upgrade first)
- FORWARD: Old schema can read data written by the new schema (safe for producers to upgrade first)
- FULL: Both directions work (the safest option, and what we use)
This means that if someone tries to rename transaction_amount to txn_amount, the schema registry rejects the change at registration time. The broken schema never reaches Kafka. The incident never happens. You catch it in the PR, not in production at 2 AM.
Implementation Approach 3: Great Expectations for Quality Contracts
Schema contracts handle the structural side (field names and types), but they do not cover data quality. A perfectly typed payload can still contain garbage data. You need a second layer that validates the actual content.
Great Expectations is the tool I keep coming back to for this. Here is an expectation suite that acts as a quality contract for our transactions table after it lands in the warehouse:
import great_expectations as gx
context = gx.get_context()
datasource = context.data_sources.add_or_update_sql(
name="warehouse",
connection_string="postgresql://analytics:${DB_PASSWORD}@warehouse:5432/analytics",
)
suite = context.suites.add(
gx.ExpectationSuite(name="transactions_quality_contract")
)
# Completeness checks
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="transaction_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="occurred_at")
)
# Acceptable null rates for optional fields
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="merchant_category_code", mostly=0.85
)
)
# Value range checks
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount_cents", min_value=0, max_value=100_000_00
)
)
# Uniqueness
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="event_id")
)
# Referential integrity
suite.add_expectation(
gx.expectations.ExpectColumnDistinctValuesToBeInSet(
column="currency", value_set=["USD", "EUR", "GBP"]
)
)
# Freshness: no records older than 48 hours should appear in today's partition
suite.add_expectation(
gx.expectations.ExpectColumnMaxToBeBetween(
column="occurred_at",
min_value="now - 48h",
max_value="now + 1h",
)
)
# Volume anomaly: expect between 10K and 500K rows per daily partition
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=10_000, max_value=500_000
)
)
We run this suite as a step in our Airflow DAG, right after the ingestion task and before any downstream transformations. If the quality contract fails, the DAG halts, the problematic partition is quarantined, and the on-call engineer gets an alert with a detailed report showing exactly which expectations failed and on which rows.
The beauty of Great Expectations is that the suite itself becomes the documentation. New team members can look at the expectation suite and immediately understand what "good data" looks like for this table. No more guessing whether nulls in merchant_category_code are normal or a bug.
Comparison: Which Approach Should You Use?
After implementing all three approaches across different parts of our platform, here is my honest assessment:
| Aspect | Pydantic (Code-level) | Schema Registry (Protobuf/Avro) | Great Expectations (Quality) |
|---|---|---|---|
| What it validates | Schema + basic business rules | Schema + compatibility evolution | Data quality + statistical properties |
| When it catches issues | Runtime (per-record) | Deploy time (schema registration) | Batch time (post-ingestion) |
| Cross-team enforcement | Weak (lives in consumer code) | Strong (shared registry) | Medium (shared suite definitions) |
| Setup complexity | Low (pip install pydantic) | High (registry infrastructure) | Medium (context + datasource config) |
| Best for | Single-team pipelines, prototyping | Multi-team streaming platforms | Warehouse/lake quality gates |
| Limitation | No compatibility checking | No quality/content validation | No structural schema enforcement |
The honest answer is that you probably need at least two of these. We use Protobuf + Schema Registry at the Kafka boundary, Pydantic models in our Python consumers for runtime validation, and Great Expectations in the warehouse for quality gates. Each layer catches different classes of problems.
CI/CD Contract Validation
Contracts that are not enforced automatically are just documentation, and documentation gets ignored. The most impactful thing we did was add contract validation to CI/CD pipelines so that breaking changes get caught in pull requests.
Here is our GitHub Actions workflow that validates schema compatibility on every PR that touches a .proto file:
# .github/workflows/schema-check.yml
name: Schema Compatibility Check
on:
pull_request:
paths:
- 'proto/**/*.proto'
- 'contracts/**/*.py'
jobs:
check-compatibility:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get changed proto files
id: changed
run: |
echo "files=$(git diff --name-only origin/main...HEAD -- 'proto/**/*.proto' | tr '\n' ' ')" >> $GITHUB_OUTPUT
- name: Check backward compatibility
if: steps.changed.outputs.files != ''
run: |
pip install confluent-kafka[schema-registry]
python scripts/check_schema_compat.py \
--registry-url ${{ secrets.SCHEMA_REGISTRY_URL }} \
--files ${{ steps.changed.outputs.files }} \
--compatibility FULL
- name: Validate Pydantic contracts
run: |
pip install pydantic pytest
pytest tests/contracts/ -v --tb=short
- name: Generate contract diff report
if: always()
run: |
python scripts/contract_diff.py \
--base origin/main \
--head HEAD \
--output contract-diff.md
- name: Comment PR with contract changes
if: always()
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const diff = fs.readFileSync('contract-diff.md', 'utf8');
if (diff.trim()) {
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Data Contract Changes\n\n${diff}`
});
}
The contract diff report is critical for reviews. It generates a human-readable summary of what changed: new fields, removed fields, type changes, modified validators. Reviewers who are not deeply familiar with Protobuf syntax can still understand the impact of the change.
We also have a Python script that validates our Pydantic contracts against sample data snapshots:
# tests/contracts/test_transaction_contract.py
import pytest
import json
from pathlib import Path
from contracts.payments import TransactionEvent
FIXTURES_DIR = Path(__file__).parent / "fixtures" / "transactions"
def load_fixtures():
"""Load real anonymized production samples for contract testing."""
fixtures = []
for f in FIXTURES_DIR.glob("*.json"):
with open(f) as fh:
fixtures.append((f.name, json.load(fh)))
return fixtures
@pytest.mark.parametrize("filename,payload", load_fixtures())
def test_contract_accepts_valid_production_data(filename, payload):
"""Every saved production sample must pass the current contract."""
event = TransactionEvent(**payload)
assert event.amount_cents >= 0
assert event.user_id > 0
def test_contract_rejects_missing_required_fields():
with pytest.raises(Exception):
TransactionEvent(
event_id="not-a-uuid",
transaction_id="short",
amount_cents=-100,
)
def test_contract_rejects_renamed_field():
"""Catch the exact incident that motivated data contracts."""
payload = {
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"transaction_id": "TXN-1234567890",
"user_id": 42,
"txn_amount": 1999, # Wrong field name!
"currency": "USD",
"status": "completed",
"merchant_name": "Coffee Shop",
"occurred_at": "2025-12-01T10:30:00Z",
"processed_at": "2025-12-01T10:30:05Z",
}
with pytest.raises(Exception) as exc_info:
TransactionEvent(**payload)
assert "amount_cents" in str(exc_info.value).lower() or "required" in str(exc_info.value).lower()
That last test is my favorite. It literally encodes the incident that started this whole journey. If anyone ever tries to rename amount_cents to something else, the contract test catches it before it reaches any environment.
The Organizational Challenge: Getting Buy-In
Here is the part that no technical blog wants to talk about: the hardest part of data contracts is not the technology. It is getting people to care.
When I first proposed data contracts to our engineering leadership, the reaction was lukewarm at best. "That sounds like more process." "We move fast, we can not add friction to every deploy." "The data team should just handle schema changes." I heard all of these and more.
What actually worked was a combination of three things:
1. Quantify the pain
I went through our incident tracker and tallied every data-related incident from the past year. Forty-seven incidents. Average time to detect: 8.3 hours. Average time to resolve: 14.6 hours. I multiplied that by average engineer cost and presented a dollar figure. When leadership saw that schema-related incidents had cost us roughly $340,000 in engineering time (not counting business impact from wrong dashboards), the conversation shifted from "why do we need this" to "how fast can we implement it."
2. Start with the highest-pain interface
Do not try to boil the ocean. We picked the single most problematic data interface: the payments event stream that had caused five incidents in six months. We implemented contracts just for that one topic. Within two months, we caught three would-be breaking changes in PR review. Each one would have been a production incident. The other teams saw this and started asking for contracts on their interfaces.
3. Make it easy, not bureaucratic
The first version of our contract proposal required a four-page specification document for every data interface. Nobody filled them out. The version that actually worked was a single Pydantic model or Protobuf file checked into the producer's repo, with a CI check that ran automatically. The total effort to create a new contract was about 15 minutes. The effort to maintain it was zero (CI handles enforcement). That is the level of friction that teams will actually accept.
One thing I wish I had done differently: involve the producing teams from day one. My initial approach was to define contracts from the consumer side and then present them to producers as fait accompli. This created resentment. The producing team felt like they were being told what to do by the data team. When we switched to a collaborative model where the producer proposes the contract and the consumer reviews it, adoption improved dramatically.
A Practical Contract Specification Format
After iterating on several formats, we settled on a YAML-based contract spec that sits alongside the code. It is not the schema itself (that is the Protobuf or Pydantic file) but a metadata layer that captures everything else:
# contracts/payments/transactions-v2.yaml
apiVersion: datacontract/v1
kind: DataContract
metadata:
name: payments-transactions-v2
version: "2.4.1"
owner:
team: payments-eng
slack: "#payments-eng"
email: payments-eng@company.com
description: >
Real-time transaction events emitted after payment processing.
Used by analytics, ML feature store, and compliance reporting.
schema:
type: protobuf
path: proto/payments/v2/transaction_event.proto
compatibility: FULL
quality:
freshness:
max_delay: 30s
measured_at: produced_at
completeness:
required_fields_null_rate: 0%
optional_fields:
merchant_category_code:
max_null_rate: 15%
volume:
daily_min: 10000
daily_max: 500000
anomaly_detection: true
sla:
availability: 99.9%
support_hours: "24/7"
incident_response: 30m
planned_maintenance_notice: 48h
consumers:
- team: data-platform
usage: "Warehouse ingestion, daily aggregation tables"
- team: ml-engineering
usage: "Real-time feature computation for fraud model"
- team: compliance
usage: "Transaction audit log, 7-year retention"
lifecycle:
created: "2024-06-15"
last_reviewed: "2025-11-15"
next_review: "2026-05-15"
deprecation_notice: null
sunset_date: null
change_management:
breaking_changes:
notice_period: 14d
approval_required_from:
- data-platform
- ml-engineering
- compliance
non_breaking_changes:
notice_period: 3d
auto_approved: true
This file is the single source of truth that answers every question during an incident: Who owns this data? What are the quality expectations? Who should I notify if I need to make a change? What is the process for breaking changes?
Monitoring and Observability for Contracts
Contracts without monitoring are just wishful thinking. Here are the key metrics we track:
- Contract violation rate: Percentage of records that fail validation, broken down by error type. We alert at 1% over 5 minutes.
- Schema drift events: Number of times a producer attempted to register an incompatible schema. Even successful blocks are worth tracking because they indicate a team that does not know about the contract.
- Freshness lag: Time between event occurrence and when it lands in the warehouse. We track p50, p95, and p99.
- Volume anomalies: We use a simple rolling 7-day average with 3-sigma bounds. If today's volume is outside that range, it triggers a review.
- Contract coverage: Percentage of data interfaces that have a formal contract. We started at 0% and are at 73% after eight months. The goal is 100% for all tier-1 interfaces.
We built a simple Grafana dashboard that shows all of these in one place. The most useful panel turned out to be a table listing every contract with its current health status: green (all good), yellow (soft violations detected), red (SLA breach). Leadership loves this view because it gives them a single pane of glass for data reliability.
Lessons Learned After One Year
We have been running data contracts in production for about a year now. Here is what I would tell myself if I could go back to the beginning:
- Start with schema contracts, add quality later. Getting field names and types agreed upon solves 70% of incidents. Quality contracts (null rates, volume bounds) are important but can come in phase two.
- Version everything. Every contract should have a version number. Every schema change should increment it. This seems obvious but we initially forgot to version our Pydantic models and had no way to tell which version of the contract a consumer was running.
- Dead letter queues are non-negotiable. When a record fails contract validation, it needs to go somewhere recoverable. We had an early bug where violated records were silently dropped. We lost two days of compliance-relevant data before catching it.
- Test contracts against production samples. Synthetic test data always passes. Save anonymized production samples and run your contract tests against them. This catches edge cases you would never think to generate.
- Make producers responsible for their contracts. The team that produces data should own the contract definition. The consumer team should review it and can request changes, but ownership must be clear. Shared ownership means no ownership.
- Contracts are living documents. Schedule quarterly reviews. Business logic changes, new fields get added, old fields become irrelevant. A contract that was accurate six months ago might be wrong today.
- Do not enforce contracts in development environments. Log violations, yes. Block pipelines, no. Engineers need to be able to iterate quickly in dev. Enforcement should ramp up as you move toward production: log in dev, warn in staging, block in prod.
Getting Started This Week
If you are reading this and thinking "this sounds great but where do I actually start," here is a concrete plan you can execute this week:
- Monday: Pick your most problematic data interface. The one that has caused the most incidents or the most confusion. Just one.
- Tuesday: Write a Pydantic model (or JSON Schema, or Protobuf) that describes what valid data looks like for that interface. Include field descriptions and basic validators.
- Wednesday: Add validation to your consumer pipeline. Log violations but do not block. Deploy to production.
- Thursday: Review the violation logs. You will almost certainly find data quality issues you did not know about. Fix the obvious ones.
- Friday: Share the contract with the producing team. Have a 30-minute conversation about ownership and change management. Get agreement that future schema changes go through PR review.
That is it. Five days, one contract, and you have proven the value. Everything after that is scaling what works.
The 2 AM incident that started this journey was painful, but it was also the best thing that happened to our data platform. It forced us to treat data interfaces with the same rigor we apply to API contracts. A year later, our schema-related incidents have dropped by 89%, our mean time to detect data issues went from 8.3 hours to 12 minutes, and I have not been woken up by a broken pipeline in four months.
Data contracts are not glamorous. They are not going to get you a conference talk or a viral blog post. But they are the single highest-ROI investment you can make in data reliability. Start small, automate enforcement, and let the results speak for themselves.




Leave a Comment