5 Python Design Patterns

5 Python Design Patterns

5 Python Design Patterns Every Data Engineer Should Master

Introduction

You’ve written the same database connection code three times this month. Your ETL script is 800 lines of nested if statements. Adding a new data source means copying and pasting code from five different files.

This isn’t a skill problem. It’s a pattern problem.

Most data engineers learn design patterns in computer science courses but never see how they apply to real data work. You’re building pipelines, not web apps. Your challenges are different: inconsistent data sources, complex transformations, fragile connections, and pipelines that need to run reliably at 3 AM.

This article shows you five design patterns that solve actual data engineering problems. No theoretical examples. No toy code. Just practical patterns you can use Monday morning to make your pipelines cleaner, more maintainable, and less likely to wake you up with a PagerDuty alert.

You’ll learn how to build flexible data connectors, handle multiple data formats elegantly, construct complex transformations step by step, wrap legacy systems cleanly, and add observability without touching core logic.

These patterns have saved me countless debugging hours and made my code easier for teammates to understand. They’ll do the same for you.


1. Factory Method: Building Flexible Data Source Connectors

The Problem

Your pipeline needs to read from different sources. PostgreSQL today, MongoDB tomorrow, S3 next week. You write a new class each time, and your orchestration code becomes a mess of if-else statements checking source types.

# The messy approach
if source_type == "postgres":
    conn = PostgresConnector(host, port, db)
elif source_type == "mongodb":
    conn = MongoConnector(uri, db)
elif source_type == "s3":
    conn = S3Connector(bucket, key)
# ... and it keeps growing

Adding a new source means changing code in multiple places. Testing becomes harder. Your main pipeline logic is tangled with connection details.

The Solution

Factory Method separates object creation from usage. You define an interface for creating connections, then let specific factories handle the details.

from abc import ABC, abstractmethod
import pandas as pd

class DataConnector(ABC):
    """Base interface all connectors must implement"""
    
    @abstractmethod
    def read_data(self, query: str) -> pd.DataFrame:
        pass
    
    @abstractmethod
    def close(self):
        pass

class PostgresConnector(DataConnector):
    def __init__(self, host: str, port: int, database: str):
        self.host = host
        self.port = port
        self.database = database
        self._connection = None
    
    def read_data(self, query: str) -> pd.DataFrame:
        # Actual implementation would use psycopg2 or sqlalchemy
        return pd.read_sql(query, self._connection)
    
    def close(self):
        if self._connection:
            self._connection.close()

class S3Connector(DataConnector):
    def __init__(self, bucket: str, region: str):
        self.bucket = bucket
        self.region = region
    
    def read_data(self, key: str) -> pd.DataFrame:
        # Actual implementation would use boto3
        return pd.read_parquet(f"s3://{self.bucket}/{key}")
    
    def close(self):
        pass  # S3 doesn't need explicit cleanup

class ConnectorFactory:
    """Factory that creates appropriate connector based on config"""
    
    @staticmethod
    def create_connector(config: dict) -> DataConnector:
        source_type = config.get("type")
        
        if source_type == "postgres":
            return PostgresConnector(
                host=config["host"],
                port=config["port"],
                database=config["database"]
            )
        elif source_type == "s3":
            return S3Connector(
                bucket=config["bucket"],
                region=config["region"]
            )
        else:
            raise ValueError(f"Unknown source type: {source_type}")

# Clean usage in your pipeline
config = {"type": "postgres", "host": "localhost", "port": 5432, "database": "analytics"}
connector = ConnectorFactory.create_connector(config)
data = connector.read_data("SELECT * FROM events WHERE date = '2025-01-01'")
connector.close()

Your pipeline code stays clean. Adding a new connector means creating one new class. No changes to existing pipeline logic.

When to Use It

Use Factory Method when you need to create objects of different types that share a common interface. Perfect for data sources, file format readers, or API clients.

When NOT to Use It

Skip it if you only have one or two data sources and no plans to add more. The extra abstraction isn’t worth it for simple cases.


2. Builder: Constructing Complex Data Transformations

The Problem

Your transformation pipeline has dozens of optional steps. Filtering, deduplication, validation, enrichment, aggregation. Some pipelines need all steps, others need a subset. Your function signatures become nightmares of optional parameters.

def transform_data(
    df, 
    remove_duplicates=True, 
    validate_schema=False, 
    fill_missing=True,
    add_timestamps=False,
    aggregate=None,
    filter_condition=None,
    # ... 15 more parameters
):
    # 200 lines of nested if statements

Calling this function is error prone. You forget which parameters you need. The order matters but isn’t obvious. Testing individual steps is painful.

The Solution

Builder pattern lets you construct complex objects step by step. Each step returns the builder itself, allowing method chaining. The final build() call creates the actual transformer.

from typing import Optional, Callable
import pandas as pd

class DataTransformer:
    """The actual transformer that does the work"""
    
    def __init__(self, steps: list):
        self.steps = steps
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        result = df.copy()
        for step in self.steps:
            result = step(result)
        return result

class TransformationBuilder:
    """Builder for creating data transformation pipelines"""
    
    def __init__(self):
        self._steps = []
    
    def remove_duplicates(self, subset: Optional[list] = None):
        """Add deduplication step"""
        def step(df):
            return df.drop_duplicates(subset=subset)
        self._steps.append(step)
        return self  # Return self for chaining
    
    def validate_schema(self, required_columns: list):
        """Add schema validation step"""
        def step(df):
            missing = set(required_columns) - set(df.columns)
            if missing:
                raise ValueError(f"Missing columns: {missing}")
            return df
        self._steps.append(step)
        return self
    
    def fill_missing(self, strategy: dict):
        """Add missing value imputation step"""
        def step(df):
            return df.fillna(strategy)
        self._steps.append(step)
        return self
    
    def filter_rows(self, condition: Callable):
        """Add filtering step"""
        def step(df):
            return df[condition(df)]
        self._steps.append(step)
        return self
    
    def add_column(self, column_name: str, value_func: Callable):
        """Add new column based on function"""
        def step(df):
            df[column_name] = value_func(df)
            return df
        self._steps.append(step)
        return self
    
    def build(self) -> DataTransformer:
        """Create the final transformer"""
        return DataTransformer(self._steps)

# Clean, readable pipeline construction
transformer = (TransformationBuilder()
    .validate_schema(['user_id', 'event_type', 'timestamp'])
    .remove_duplicates(subset=['user_id', 'timestamp'])
    .filter_rows(lambda df: df['timestamp'] > '2025-01-01')
    .fill_missing({'event_type': 'unknown'})
    .add_column('processed_at', lambda df: pd.Timestamp.now())
    .build())

# Apply to your data
raw_data = pd.DataFrame(...)  # Your source data
clean_data = transformer.transform(raw_data)

Each transformation step is isolated and testable. The pipeline construction is self-documenting. You only include steps you actually need.

When to Use It

Use Builder when you’re constructing complex objects with many optional components. Ideal for ETL pipelines, data validation workflows, or report generators.

When NOT to Use It

Skip it for simple transformations with just two or three steps. A regular function is clearer. Builder shines when complexity grows.


3. Decorator: Adding Observability Without Changing Core Logic

The Problem

Your transformation functions work fine, but you need to add logging, execution time tracking, error handling, and data quality metrics. You don’t want to pollute every function with the same boilerplate code.

def process_user_events(df):
    start_time = time.time()
    logger.info("Starting user event processing")
    
    try:
        # Actual transformation logic
        result = df.groupby('user_id').agg({'events': 'count'})
        
        # Log metrics
        logger.info(f"Processed {len(result)} users")
        logger.info(f"Execution time: {time.time() - start_time}s")
        
        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

Every function has the same wrapper code. When you want to add a new metric, you update dozens of functions.

The Solution

Decorators wrap functions with additional behavior without modifying their core logic. Python’s @ syntax makes them easy to apply.

import time
import logging
from functools import wraps
import pandas as pd

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_execution_time(func):
    """Decorator that logs function execution time"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        logger.info(f"{func.__name__} took {duration:.2f}s")
        return result
    return wrapper

def log_dataframe_shape(func):
    """Decorator that logs input/output DataFrame shapes"""
    @wraps(func)
    def wrapper(df, *args, **kwargs):
        logger.info(f"{func.__name__} input shape: {df.shape}")
        result = func(df, *args, **kwargs)
        logger.info(f"{func.__name__} output shape: {result.shape}")
        return result
    return wrapper

def validate_not_empty(func):
    """Decorator that ensures DataFrame is not empty"""
    @wraps(func)
    def wrapper(df, *args, **kwargs):
        if df.empty:
            raise ValueError(f"{func.__name__} received empty DataFrame")
        return func(df, *args, **kwargs)
    return wrapper

def retry_on_failure(max_attempts=3, delay=1):
    """Decorator that retries function on failure"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    logger.warning(f"{func.__name__} failed, attempt {attempt + 1}/{max_attempts}")
                    time.sleep(delay)
        return wrapper
    return decorator

# Clean function with stacked decorators
@log_execution_time
@log_dataframe_shape
@validate_not_empty
def process_user_events(df: pd.DataFrame) -> pd.DataFrame:
    """Core business logic stays clean and focused"""
    return df.groupby('user_id').agg({
        'event_count': 'count',
        'last_event': 'max'
    })

@retry_on_failure(max_attempts=3)
@log_execution_time
def fetch_external_data(api_url: str) -> pd.DataFrame:
    """Automatically retries if API call fails"""
    # API call logic here
    return pd.DataFrame()

Your core logic stays clean. Adding new monitoring is one decorator away. Testing is easier because behavior is separated into reusable components.

When to Use It

Use Decorators when you need to add cross-cutting concerns like logging, metrics, caching, validation, or retry logic across multiple functions.

When NOT to Use It

Skip decorators if the additional behavior is specific to just one function. Simple is better than clever.


4. Adapter: Wrapping Legacy Systems and External APIs

The Problem

You’re integrating with a third-party API that returns data in an awkward format. Or you have a legacy system with a clunky interface. You don’t want this messiness spreading through your clean pipeline code.

# Direct usage of messy external API
response = legacy_api.get_customer_data(customer_id)
# Response is nested XML converted to dict with inconsistent keys
customer_name = response['CustomerInfo']['PersonalDetails']['FullName']['FirstName']
customer_email = response['ContactMethods']['EmailAddresses']['Primary']['Address']
# Different endpoint has completely different structure
orders = legacy_api.get_orders(customer_id)['OrderHistory']['Items']['Item']

Your pipeline code becomes tightly coupled to external systems. When the API changes, you update code everywhere.

The Solution

Adapter pattern wraps external interfaces with a clean, consistent interface your code expects. The adapter handles all the messy translation logic.

from typing import List, Dict
from datetime import datetime
import pandas as pd

class Customer:
    """Clean internal model for customer data"""
    def __init__(self, customer_id: str, name: str, email: str, signup_date: datetime):
        self.customer_id = customer_id
        self.name = name
        self.email = email
        self.signup_date = signup_date
    
    def to_dict(self) -> dict:
        return {
            'customer_id': self.customer_id,
            'name': self.name,
            'email': self.email,
            'signup_date': self.signup_date
        }

class LegacyAPIAdapter:
    """Adapter that wraps messy legacy API with clean interface"""
    
    def __init__(self, api_client):
        self._api = api_client
    
    def get_customer(self, customer_id: str) -> Customer:
        """Convert messy API response to clean Customer object"""
        response = self._api.get_customer_data(customer_id)
        
        # Handle nested structure
        personal = response.get('CustomerInfo', {}).get('PersonalDetails', {})
        contact = response.get('ContactMethods', {}).get('EmailAddresses', {})
        
        # Extract and clean data
        first_name = personal.get('FullName', {}).get('FirstName', '')
        last_name = personal.get('FullName', {}).get('LastName', '')
        name = f"{first_name} {last_name}".strip()
        
        email = contact.get('Primary', {}).get('Address', '')
        
        # Parse inconsistent date format
        date_str = response.get('RegistrationDate', '')
        signup_date = datetime.strptime(date_str, '%Y%m%d') if date_str else None
        
        return Customer(
            customer_id=customer_id,
            name=name,
            email=email,
            signup_date=signup_date
        )
    
    def get_customers_batch(self, customer_ids: List[str]) -> pd.DataFrame:
        """Fetch multiple customers and return clean DataFrame"""
        customers = [self.get_customer(cid) for cid in customer_ids]
        return pd.DataFrame([c.to_dict() for c in customers])

class ModernAPIAdapter:
    """Adapter for different API with same clean interface"""
    
    def __init__(self, api_client):
        self._api = api_client
    
    def get_customer(self, customer_id: str) -> Customer:
        """Modern API might have cleaner structure but we normalize it"""
        response = self._api.fetch_user(customer_id)
        
        return Customer(
            customer_id=response['id'],
            name=response['display_name'],
            email=response['primary_email'],
            signup_date=datetime.fromisoformat(response['created_at'])
        )
    
    def get_customers_batch(self, customer_ids: List[str]) -> pd.DataFrame:
        # Batch endpoint might be different but output is same
        response = self._api.fetch_users_bulk(customer_ids)
        customers = [
            Customer(
                customer_id=u['id'],
                name=u['display_name'],
                email=u['primary_email'],
                signup_date=datetime.fromisoformat(u['created_at'])
            )
            for u in response['users']
        ]
        return pd.DataFrame([c.to_dict() for c in customers])

# Your pipeline code stays clean regardless of which API you use
def process_customer_data(adapter, customer_ids: List[str]):
    """Works with any adapter that implements the interface"""
    customers_df = adapter.get_customers_batch(customer_ids)
    
    # Clean business logic with consistent data structure
    active_customers = customers_df[
        customers_df['signup_date'] > '2024-01-01'
    ]
    
    return active_customers

# Easy to switch between different data sources
legacy_adapter = LegacyAPIAdapter(legacy_api_client)
modern_adapter = ModernAPIAdapter(modern_api_client)

# Same pipeline code works with both
result1 = process_customer_data(legacy_adapter, customer_ids)
result2 = process_customer_data(modern_adapter, customer_ids)

The adapter isolates all the messy translation logic. Your pipeline works with clean data models. Swapping data sources means writing a new adapter, not rewriting pipeline code.

When to Use It

Use Adapter when integrating with external systems that have incompatible interfaces, wrapping legacy code, or creating a stable internal API on top of changing external dependencies.

When NOT to Use It

Skip it if the external interface is already clean and matches your needs. Don’t add abstraction just for the sake of patterns.


5. Abstract Factory: Managing Related Object Families

The Problem

You’re building pipelines that work across different environments: development uses local files and SQLite, staging uses S3 and PostgreSQL, production uses S3 and Redshift. You need matching sets of components for each environment, and mixing them causes subtle bugs.

# Dangerous mixing of components from different environments
reader = S3Reader(prod_bucket)  # Production reader
writer = LocalFileWriter('/tmp')  # Dev writer
validator = PostgresValidator(staging_db)  # Staging validator
# This pipeline is a disaster waiting to happen

Creating components independently means you might accidentally combine incompatible pieces.

The Solution

Abstract Factory provides an interface for creating families of related objects without specifying their concrete classes. Each factory creates a complete set of compatible components.

from abc import ABC, abstractmethod
import pandas as pd

# Abstract interfaces
class DataReader(ABC):
    @abstractmethod
    def read(self, location: str) -> pd.DataFrame:
        pass

class DataWriter(ABC):
    @abstractmethod
    def write(self, df: pd.DataFrame, location: str):
        pass

class DataValidator(ABC):
    @abstractmethod
    def validate(self, df: pd.DataFrame) -> bool:
        pass

# Concrete implementations for local development
class LocalFileReader(DataReader):
    def read(self, location: str) -> pd.DataFrame:
        return pd.read_csv(location)

class LocalFileWriter(DataWriter):
    def write(self, df: pd.DataFrame, location: str):
        df.to_csv(location, index=False)

class LocalValidator(DataValidator):
    def validate(self, df: pd.DataFrame) -> bool:
        # Simple validation for dev
        return not df.empty and len(df.columns) > 0

# Concrete implementations for production
class S3Reader(DataReader):
    def __init__(self, bucket: str):
        self.bucket = bucket
    
    def read(self, location: str) -> pd.DataFrame:
        return pd.read_parquet(f"s3://{self.bucket}/{location}")

class S3Writer(DataWriter):
    def __init__(self, bucket: str):
        self.bucket = bucket
    
    def write(self, df: pd.DataFrame, location: str):
        df.to_parquet(f"s3://{self.bucket}/{location}")

class RedshiftValidator(DataValidator):
    def __init__(self, connection_string: str):
        self.conn_string = connection_string
    
    def validate(self, df: pd.DataFrame) -> bool:
        # Validate against schema in Redshift
        # Check row counts, data types, constraints
        return True  # Simplified

# Abstract Factory interface
class DataPipelineFactory(ABC):
    @abstractmethod
    def create_reader(self) -> DataReader:
        pass
    
    @abstractmethod
    def create_writer(self) -> DataWriter:
        pass
    
    @abstractmethod
    def create_validator(self) -> DataValidator:
        pass

# Concrete factory for development environment
class DevelopmentFactory(DataPipelineFactory):
    def create_reader(self) -> DataReader:
        return LocalFileReader()
    
    def create_writer(self) -> DataWriter:
        return LocalFileWriter()
    
    def create_validator(self) -> DataValidator:
        return LocalValidator()

# Concrete factory for production environment
class ProductionFactory(DataPipelineFactory):
    def __init__(self, bucket: str, redshift_conn: str):
        self.bucket = bucket
        self.redshift_conn = redshift_conn
    
    def create_reader(self) -> DataReader:
        return S3Reader(self.bucket)
    
    def create_writer(self) -> DataWriter:
        return S3Writer(self.bucket)
    
    def create_validator(self) -> DataValidator:
        return RedshiftValidator(self.redshift_conn)

# Pipeline code works with any factory
class DataPipeline:
    def __init__(self, factory: DataPipelineFactory):
        self.reader = factory.create_reader()
        self.writer = factory.create_writer()
        self.validator = factory.create_validator()
    
    def run(self, source: str, destination: str):
        # All components guaranteed to be compatible
        data = self.reader.read(source)
        
        if not self.validator.validate(data):
            raise ValueError("Data validation failed")
        
        # Transform data
        transformed = data.copy()  # Your transformation logic
        
        self.writer.write(transformed, destination)

# Switch environments by changing factory
dev_factory = DevelopmentFactory()
prod_factory = ProductionFactory(bucket="prod-data", redshift_conn="...")

# Same pipeline code, different environments
dev_pipeline = DataPipeline(dev_factory)
dev_pipeline.run("data/input.csv", "data/output.csv")

prod_pipeline = DataPipeline(prod_factory)
prod_pipeline.run("input/data.parquet", "output/data.parquet")

You can’t accidentally mix development and production components. The factory ensures all pieces work together. Adding a new environment means creating one new factory class.

When to Use It

Use Abstract Factory when you need to create families of related objects that must be used together, especially for managing different environments or configurations.

When NOT to Use It

Skip it if you only have one or two variations. Regular Factory Method is simpler. Abstract Factory makes sense when you have multiple related components that need to vary together.


Conclusion

These five patterns solve real problems you face building data pipelines:

Factory Method creates flexible connectors without tight coupling. Use it when you need multiple implementations of the same interface.

Builder constructs complex transformations step by step. Use it when you have many optional configuration options.

Decorator adds observability without polluting core logic. Use it for cross-cutting concerns like logging, metrics, and retry logic.

Adapter wraps messy external systems with clean interfaces. Use it when integrating legacy systems or third-party APIs.

Abstract Factory manages families of related components. Use it when you need matching sets of objects that work together.

Start simple. Pick one pattern that solves your most painful problem right now. Refactor one piece of code this week. You’ll immediately see cleaner, more maintainable pipelines.

The goal isn’t to use all patterns everywhere. The goal is to recognize when a pattern makes your code better and when it just adds unnecessary complexity.

Your future self, debugging a pipeline at 2 AM, will thank you for writing code that’s easy to understand and modify.


References

  1. Refactoring.Guru Design Patterns – https://refactoring.guru/design-patterns/python
  2. Python Official Documentation – https://docs.python.org/3/
  3. Martin Fowler’s Refactoring Catalog – https://refactoring.com/catalog/
  4. Design Patterns https://www.geeksforgeeks.org/python/python-design-patterns/
  5. Tha catalog of Patterns https://refactoring.guru/design-patterns/python
  6. ArjanCode https://www.youtube.com/playlist?list=PLC0nd42SBTaNf0bVJVd9e2oBV-mcUuxS0
  7. 25 Patterns on Python https://www.wscubetech.com/resources/python/programs/pattern
  8. The digital Cat Book https://www.thedigitalcatbooks.com/pycabook-introduction/
  9. Python Pattern Designes https://python-patterns.guide/
  10. Complite guide of pattern Designs https://www.index.dev/blog/python-design-patterns-complete-guide

Leave a Reply

Your email address will not be published. Required fields are marked *