5 Python Design Patterns Every Data Engineer Should Master
Introduction
You’ve written the same database connection code three times this month. Your ETL script is 800 lines of nested if statements. Adding a new data source means copying and pasting code from five different files.
This isn’t a skill problem. It’s a pattern problem.
Most data engineers learn design patterns in computer science courses but never see how they apply to real data work. You’re building pipelines, not web apps. Your challenges are different: inconsistent data sources, complex transformations, fragile connections, and pipelines that need to run reliably at 3 AM.
This article shows you five design patterns that solve actual data engineering problems. No theoretical examples. No toy code. Just practical patterns you can use Monday morning to make your pipelines cleaner, more maintainable, and less likely to wake you up with a PagerDuty alert.
You’ll learn how to build flexible data connectors, handle multiple data formats elegantly, construct complex transformations step by step, wrap legacy systems cleanly, and add observability without touching core logic.
These patterns have saved me countless debugging hours and made my code easier for teammates to understand. They’ll do the same for you.
1. Factory Method: Building Flexible Data Source Connectors
The Problem
Your pipeline needs to read from different sources. PostgreSQL today, MongoDB tomorrow, S3 next week. You write a new class each time, and your orchestration code becomes a mess of if-else statements checking source types.
# The messy approach
if source_type == "postgres":
conn = PostgresConnector(host, port, db)
elif source_type == "mongodb":
conn = MongoConnector(uri, db)
elif source_type == "s3":
conn = S3Connector(bucket, key)
# ... and it keeps growing
Adding a new source means changing code in multiple places. Testing becomes harder. Your main pipeline logic is tangled with connection details.
The Solution
Factory Method separates object creation from usage. You define an interface for creating connections, then let specific factories handle the details.
from abc import ABC, abstractmethod
import pandas as pd
class DataConnector(ABC):
"""Base interface all connectors must implement"""
@abstractmethod
def read_data(self, query: str) -> pd.DataFrame:
pass
@abstractmethod
def close(self):
pass
class PostgresConnector(DataConnector):
def __init__(self, host: str, port: int, database: str):
self.host = host
self.port = port
self.database = database
self._connection = None
def read_data(self, query: str) -> pd.DataFrame:
# Actual implementation would use psycopg2 or sqlalchemy
return pd.read_sql(query, self._connection)
def close(self):
if self._connection:
self._connection.close()
class S3Connector(DataConnector):
def __init__(self, bucket: str, region: str):
self.bucket = bucket
self.region = region
def read_data(self, key: str) -> pd.DataFrame:
# Actual implementation would use boto3
return pd.read_parquet(f"s3://{self.bucket}/{key}")
def close(self):
pass # S3 doesn't need explicit cleanup
class ConnectorFactory:
"""Factory that creates appropriate connector based on config"""
@staticmethod
def create_connector(config: dict) -> DataConnector:
source_type = config.get("type")
if source_type == "postgres":
return PostgresConnector(
host=config["host"],
port=config["port"],
database=config["database"]
)
elif source_type == "s3":
return S3Connector(
bucket=config["bucket"],
region=config["region"]
)
else:
raise ValueError(f"Unknown source type: {source_type}")
# Clean usage in your pipeline
config = {"type": "postgres", "host": "localhost", "port": 5432, "database": "analytics"}
connector = ConnectorFactory.create_connector(config)
data = connector.read_data("SELECT * FROM events WHERE date = '2025-01-01'")
connector.close()
Your pipeline code stays clean. Adding a new connector means creating one new class. No changes to existing pipeline logic.
When to Use It
Use Factory Method when you need to create objects of different types that share a common interface. Perfect for data sources, file format readers, or API clients.
When NOT to Use It
Skip it if you only have one or two data sources and no plans to add more. The extra abstraction isn’t worth it for simple cases.
2. Builder: Constructing Complex Data Transformations
The Problem
Your transformation pipeline has dozens of optional steps. Filtering, deduplication, validation, enrichment, aggregation. Some pipelines need all steps, others need a subset. Your function signatures become nightmares of optional parameters.
def transform_data(
df,
remove_duplicates=True,
validate_schema=False,
fill_missing=True,
add_timestamps=False,
aggregate=None,
filter_condition=None,
# ... 15 more parameters
):
# 200 lines of nested if statements
Calling this function is error prone. You forget which parameters you need. The order matters but isn’t obvious. Testing individual steps is painful.
The Solution
Builder pattern lets you construct complex objects step by step. Each step returns the builder itself, allowing method chaining. The final build() call creates the actual transformer.
from typing import Optional, Callable
import pandas as pd
class DataTransformer:
"""The actual transformer that does the work"""
def __init__(self, steps: list):
self.steps = steps
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
for step in self.steps:
result = step(result)
return result
class TransformationBuilder:
"""Builder for creating data transformation pipelines"""
def __init__(self):
self._steps = []
def remove_duplicates(self, subset: Optional[list] = None):
"""Add deduplication step"""
def step(df):
return df.drop_duplicates(subset=subset)
self._steps.append(step)
return self # Return self for chaining
def validate_schema(self, required_columns: list):
"""Add schema validation step"""
def step(df):
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
return df
self._steps.append(step)
return self
def fill_missing(self, strategy: dict):
"""Add missing value imputation step"""
def step(df):
return df.fillna(strategy)
self._steps.append(step)
return self
def filter_rows(self, condition: Callable):
"""Add filtering step"""
def step(df):
return df[condition(df)]
self._steps.append(step)
return self
def add_column(self, column_name: str, value_func: Callable):
"""Add new column based on function"""
def step(df):
df[column_name] = value_func(df)
return df
self._steps.append(step)
return self
def build(self) -> DataTransformer:
"""Create the final transformer"""
return DataTransformer(self._steps)
# Clean, readable pipeline construction
transformer = (TransformationBuilder()
.validate_schema(['user_id', 'event_type', 'timestamp'])
.remove_duplicates(subset=['user_id', 'timestamp'])
.filter_rows(lambda df: df['timestamp'] > '2025-01-01')
.fill_missing({'event_type': 'unknown'})
.add_column('processed_at', lambda df: pd.Timestamp.now())
.build())
# Apply to your data
raw_data = pd.DataFrame(...) # Your source data
clean_data = transformer.transform(raw_data)
Each transformation step is isolated and testable. The pipeline construction is self-documenting. You only include steps you actually need.
When to Use It
Use Builder when you’re constructing complex objects with many optional components. Ideal for ETL pipelines, data validation workflows, or report generators.
When NOT to Use It
Skip it for simple transformations with just two or three steps. A regular function is clearer. Builder shines when complexity grows.
3. Decorator: Adding Observability Without Changing Core Logic
The Problem
Your transformation functions work fine, but you need to add logging, execution time tracking, error handling, and data quality metrics. You don’t want to pollute every function with the same boilerplate code.
def process_user_events(df):
start_time = time.time()
logger.info("Starting user event processing")
try:
# Actual transformation logic
result = df.groupby('user_id').agg({'events': 'count'})
# Log metrics
logger.info(f"Processed {len(result)} users")
logger.info(f"Execution time: {time.time() - start_time}s")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
Every function has the same wrapper code. When you want to add a new metric, you update dozens of functions.
The Solution
Decorators wrap functions with additional behavior without modifying their core logic. Python’s @ syntax makes them easy to apply.
import time
import logging
from functools import wraps
import pandas as pd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_execution_time(func):
"""Decorator that logs function execution time"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
logger.info(f"{func.__name__} took {duration:.2f}s")
return result
return wrapper
def log_dataframe_shape(func):
"""Decorator that logs input/output DataFrame shapes"""
@wraps(func)
def wrapper(df, *args, **kwargs):
logger.info(f"{func.__name__} input shape: {df.shape}")
result = func(df, *args, **kwargs)
logger.info(f"{func.__name__} output shape: {result.shape}")
return result
return wrapper
def validate_not_empty(func):
"""Decorator that ensures DataFrame is not empty"""
@wraps(func)
def wrapper(df, *args, **kwargs):
if df.empty:
raise ValueError(f"{func.__name__} received empty DataFrame")
return func(df, *args, **kwargs)
return wrapper
def retry_on_failure(max_attempts=3, delay=1):
"""Decorator that retries function on failure"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
logger.warning(f"{func.__name__} failed, attempt {attempt + 1}/{max_attempts}")
time.sleep(delay)
return wrapper
return decorator
# Clean function with stacked decorators
@log_execution_time
@log_dataframe_shape
@validate_not_empty
def process_user_events(df: pd.DataFrame) -> pd.DataFrame:
"""Core business logic stays clean and focused"""
return df.groupby('user_id').agg({
'event_count': 'count',
'last_event': 'max'
})
@retry_on_failure(max_attempts=3)
@log_execution_time
def fetch_external_data(api_url: str) -> pd.DataFrame:
"""Automatically retries if API call fails"""
# API call logic here
return pd.DataFrame()
Your core logic stays clean. Adding new monitoring is one decorator away. Testing is easier because behavior is separated into reusable components.
When to Use It
Use Decorators when you need to add cross-cutting concerns like logging, metrics, caching, validation, or retry logic across multiple functions.
When NOT to Use It
Skip decorators if the additional behavior is specific to just one function. Simple is better than clever.
4. Adapter: Wrapping Legacy Systems and External APIs
The Problem
You’re integrating with a third-party API that returns data in an awkward format. Or you have a legacy system with a clunky interface. You don’t want this messiness spreading through your clean pipeline code.
# Direct usage of messy external API
response = legacy_api.get_customer_data(customer_id)
# Response is nested XML converted to dict with inconsistent keys
customer_name = response['CustomerInfo']['PersonalDetails']['FullName']['FirstName']
customer_email = response['ContactMethods']['EmailAddresses']['Primary']['Address']
# Different endpoint has completely different structure
orders = legacy_api.get_orders(customer_id)['OrderHistory']['Items']['Item']
Your pipeline code becomes tightly coupled to external systems. When the API changes, you update code everywhere.
The Solution
Adapter pattern wraps external interfaces with a clean, consistent interface your code expects. The adapter handles all the messy translation logic.
from typing import List, Dict
from datetime import datetime
import pandas as pd
class Customer:
"""Clean internal model for customer data"""
def __init__(self, customer_id: str, name: str, email: str, signup_date: datetime):
self.customer_id = customer_id
self.name = name
self.email = email
self.signup_date = signup_date
def to_dict(self) -> dict:
return {
'customer_id': self.customer_id,
'name': self.name,
'email': self.email,
'signup_date': self.signup_date
}
class LegacyAPIAdapter:
"""Adapter that wraps messy legacy API with clean interface"""
def __init__(self, api_client):
self._api = api_client
def get_customer(self, customer_id: str) -> Customer:
"""Convert messy API response to clean Customer object"""
response = self._api.get_customer_data(customer_id)
# Handle nested structure
personal = response.get('CustomerInfo', {}).get('PersonalDetails', {})
contact = response.get('ContactMethods', {}).get('EmailAddresses', {})
# Extract and clean data
first_name = personal.get('FullName', {}).get('FirstName', '')
last_name = personal.get('FullName', {}).get('LastName', '')
name = f"{first_name} {last_name}".strip()
email = contact.get('Primary', {}).get('Address', '')
# Parse inconsistent date format
date_str = response.get('RegistrationDate', '')
signup_date = datetime.strptime(date_str, '%Y%m%d') if date_str else None
return Customer(
customer_id=customer_id,
name=name,
email=email,
signup_date=signup_date
)
def get_customers_batch(self, customer_ids: List[str]) -> pd.DataFrame:
"""Fetch multiple customers and return clean DataFrame"""
customers = [self.get_customer(cid) for cid in customer_ids]
return pd.DataFrame([c.to_dict() for c in customers])
class ModernAPIAdapter:
"""Adapter for different API with same clean interface"""
def __init__(self, api_client):
self._api = api_client
def get_customer(self, customer_id: str) -> Customer:
"""Modern API might have cleaner structure but we normalize it"""
response = self._api.fetch_user(customer_id)
return Customer(
customer_id=response['id'],
name=response['display_name'],
email=response['primary_email'],
signup_date=datetime.fromisoformat(response['created_at'])
)
def get_customers_batch(self, customer_ids: List[str]) -> pd.DataFrame:
# Batch endpoint might be different but output is same
response = self._api.fetch_users_bulk(customer_ids)
customers = [
Customer(
customer_id=u['id'],
name=u['display_name'],
email=u['primary_email'],
signup_date=datetime.fromisoformat(u['created_at'])
)
for u in response['users']
]
return pd.DataFrame([c.to_dict() for c in customers])
# Your pipeline code stays clean regardless of which API you use
def process_customer_data(adapter, customer_ids: List[str]):
"""Works with any adapter that implements the interface"""
customers_df = adapter.get_customers_batch(customer_ids)
# Clean business logic with consistent data structure
active_customers = customers_df[
customers_df['signup_date'] > '2024-01-01'
]
return active_customers
# Easy to switch between different data sources
legacy_adapter = LegacyAPIAdapter(legacy_api_client)
modern_adapter = ModernAPIAdapter(modern_api_client)
# Same pipeline code works with both
result1 = process_customer_data(legacy_adapter, customer_ids)
result2 = process_customer_data(modern_adapter, customer_ids)
The adapter isolates all the messy translation logic. Your pipeline works with clean data models. Swapping data sources means writing a new adapter, not rewriting pipeline code.
When to Use It
Use Adapter when integrating with external systems that have incompatible interfaces, wrapping legacy code, or creating a stable internal API on top of changing external dependencies.
When NOT to Use It
Skip it if the external interface is already clean and matches your needs. Don’t add abstraction just for the sake of patterns.
5. Abstract Factory: Managing Related Object Families
The Problem
You’re building pipelines that work across different environments: development uses local files and SQLite, staging uses S3 and PostgreSQL, production uses S3 and Redshift. You need matching sets of components for each environment, and mixing them causes subtle bugs.
# Dangerous mixing of components from different environments
reader = S3Reader(prod_bucket) # Production reader
writer = LocalFileWriter('/tmp') # Dev writer
validator = PostgresValidator(staging_db) # Staging validator
# This pipeline is a disaster waiting to happen
Creating components independently means you might accidentally combine incompatible pieces.
The Solution
Abstract Factory provides an interface for creating families of related objects without specifying their concrete classes. Each factory creates a complete set of compatible components.
from abc import ABC, abstractmethod
import pandas as pd
# Abstract interfaces
class DataReader(ABC):
@abstractmethod
def read(self, location: str) -> pd.DataFrame:
pass
class DataWriter(ABC):
@abstractmethod
def write(self, df: pd.DataFrame, location: str):
pass
class DataValidator(ABC):
@abstractmethod
def validate(self, df: pd.DataFrame) -> bool:
pass
# Concrete implementations for local development
class LocalFileReader(DataReader):
def read(self, location: str) -> pd.DataFrame:
return pd.read_csv(location)
class LocalFileWriter(DataWriter):
def write(self, df: pd.DataFrame, location: str):
df.to_csv(location, index=False)
class LocalValidator(DataValidator):
def validate(self, df: pd.DataFrame) -> bool:
# Simple validation for dev
return not df.empty and len(df.columns) > 0
# Concrete implementations for production
class S3Reader(DataReader):
def __init__(self, bucket: str):
self.bucket = bucket
def read(self, location: str) -> pd.DataFrame:
return pd.read_parquet(f"s3://{self.bucket}/{location}")
class S3Writer(DataWriter):
def __init__(self, bucket: str):
self.bucket = bucket
def write(self, df: pd.DataFrame, location: str):
df.to_parquet(f"s3://{self.bucket}/{location}")
class RedshiftValidator(DataValidator):
def __init__(self, connection_string: str):
self.conn_string = connection_string
def validate(self, df: pd.DataFrame) -> bool:
# Validate against schema in Redshift
# Check row counts, data types, constraints
return True # Simplified
# Abstract Factory interface
class DataPipelineFactory(ABC):
@abstractmethod
def create_reader(self) -> DataReader:
pass
@abstractmethod
def create_writer(self) -> DataWriter:
pass
@abstractmethod
def create_validator(self) -> DataValidator:
pass
# Concrete factory for development environment
class DevelopmentFactory(DataPipelineFactory):
def create_reader(self) -> DataReader:
return LocalFileReader()
def create_writer(self) -> DataWriter:
return LocalFileWriter()
def create_validator(self) -> DataValidator:
return LocalValidator()
# Concrete factory for production environment
class ProductionFactory(DataPipelineFactory):
def __init__(self, bucket: str, redshift_conn: str):
self.bucket = bucket
self.redshift_conn = redshift_conn
def create_reader(self) -> DataReader:
return S3Reader(self.bucket)
def create_writer(self) -> DataWriter:
return S3Writer(self.bucket)
def create_validator(self) -> DataValidator:
return RedshiftValidator(self.redshift_conn)
# Pipeline code works with any factory
class DataPipeline:
def __init__(self, factory: DataPipelineFactory):
self.reader = factory.create_reader()
self.writer = factory.create_writer()
self.validator = factory.create_validator()
def run(self, source: str, destination: str):
# All components guaranteed to be compatible
data = self.reader.read(source)
if not self.validator.validate(data):
raise ValueError("Data validation failed")
# Transform data
transformed = data.copy() # Your transformation logic
self.writer.write(transformed, destination)
# Switch environments by changing factory
dev_factory = DevelopmentFactory()
prod_factory = ProductionFactory(bucket="prod-data", redshift_conn="...")
# Same pipeline code, different environments
dev_pipeline = DataPipeline(dev_factory)
dev_pipeline.run("data/input.csv", "data/output.csv")
prod_pipeline = DataPipeline(prod_factory)
prod_pipeline.run("input/data.parquet", "output/data.parquet")
You can’t accidentally mix development and production components. The factory ensures all pieces work together. Adding a new environment means creating one new factory class.
When to Use It
Use Abstract Factory when you need to create families of related objects that must be used together, especially for managing different environments or configurations.
When NOT to Use It
Skip it if you only have one or two variations. Regular Factory Method is simpler. Abstract Factory makes sense when you have multiple related components that need to vary together.
Conclusion
These five patterns solve real problems you face building data pipelines:
Factory Method creates flexible connectors without tight coupling. Use it when you need multiple implementations of the same interface.
Builder constructs complex transformations step by step. Use it when you have many optional configuration options.
Decorator adds observability without polluting core logic. Use it for cross-cutting concerns like logging, metrics, and retry logic.
Adapter wraps messy external systems with clean interfaces. Use it when integrating legacy systems or third-party APIs.
Abstract Factory manages families of related components. Use it when you need matching sets of objects that work together.
Start simple. Pick one pattern that solves your most painful problem right now. Refactor one piece of code this week. You’ll immediately see cleaner, more maintainable pipelines.
The goal isn’t to use all patterns everywhere. The goal is to recognize when a pattern makes your code better and when it just adds unnecessary complexity.
Your future self, debugging a pipeline at 2 AM, will thank you for writing code that’s easy to understand and modify.
References
- Refactoring.Guru Design Patterns – https://refactoring.guru/design-patterns/python
- Python Official Documentation – https://docs.python.org/3/
- Martin Fowler’s Refactoring Catalog – https://refactoring.com/catalog/
- Design Patterns https://www.geeksforgeeks.org/python/python-design-patterns/
- Tha catalog of Patterns https://refactoring.guru/design-patterns/python
- ArjanCode https://www.youtube.com/playlist?list=PLC0nd42SBTaNf0bVJVd9e2oBV-mcUuxS0
- 25 Patterns on Python https://www.wscubetech.com/resources/python/programs/pattern
- The digital Cat Book https://www.thedigitalcatbooks.com/pycabook-introduction/
- Python Pattern Designes https://python-patterns.guide/
- Complite guide of pattern Designs https://www.index.dev/blog/python-design-patterns-complete-guide














Leave a Reply