MCP for Data Engineers: Build AI Tools That Actually Understand Your Data Stack

Last month I watched a senior data engineer on my team spend 40 minutes debugging a failed Airflow DAG. He tabbed between the Airflow UI, the database console, the dbt docs site, and Slack logs. When he finally found the root cause — a schema change in an upstream table that broke a downstream join — he said, "I wish I could just ask someone who already knows all of this." That is exactly what the Model Context Protocol lets you build. MCP is the protocol that finally lets LLMs like Claude reach into your actual data infrastructure and answer questions with real context, not hallucinated guesses.

If you have been dismissing MCP as "just another AI chatbot thing," you are making the same mistake people made about REST APIs in 2008. This protocol is going to become the standard way AI interacts with every tool in your data stack. Let me show you why, and more importantly, show you how to build it.

What MCP Is and Why Data Engineers Should Care

Model Context Protocol is an open standard created by Anthropic that defines how LLMs communicate with external tools and data sources. Think of it as a USB-C port for AI — a universal interface that lets any LLM connect to any tool through a standardized protocol instead of one-off integrations.

Before MCP, connecting an LLM to your database meant writing custom function calling code, managing authentication yourself, serializing results into prompt-friendly formats, and maintaining all of it per model provider. If you switched from OpenAI to Claude, you rewrote your integrations. If you added a new tool, you wrote another bespoke connector.

MCP fixes this with a clear separation of concerns:

  • MCP Hosts — Applications that embed LLMs and want to access external tools (Claude Desktop, Cursor, your custom app)
  • MCP Clients — Protocol-level connectors that maintain 1:1 sessions with servers (usually embedded in the host)
  • MCP Servers — Lightweight programs that expose specific capabilities: tools, resources, and prompts

The killer insight is that MCP servers are composable. You do not build one giant connector that does everything. You build small, focused servers — one for your database, one for your dbt catalog, one for Airflow — and the host connects to all of them simultaneously. Claude can then seamlessly pull context from any server mid-conversation.

The Three Primitives

Every MCP server exposes some combination of three primitives:

  • Tools — Functions the LLM can invoke (e.g., run_query, get_dag_status). Model-controlled: the LLM decides when to call them.
  • Resources — Data the server exposes for the LLM to read (e.g., table schemas, documentation). Application-controlled: surfaced by the host UI.
  • Prompts — Reusable prompt templates the server provides (e.g., "analyze this table's data quality"). User-controlled: selected explicitly by the user.

For data engineering use cases, tools are the most immediately useful. But resources are where the real leverage is — imagine your LLM automatically having access to every table schema, every dbt model description, every column-level lineage graph, without you pasting anything into the chat.

Architecture: How MCP Actually Works

MCP uses a JSON-RPC 2.0 message format over one of two transport mechanisms:

stdio Transport

The host launches the MCP server as a subprocess and communicates via stdin/stdout. This is the simplest option, perfect for local development and tools like Claude Desktop. Zero networking, zero auth, zero config. The host manages the server's lifecycle.

SSE (Server-Sent Events) Transport

The server runs as a standalone HTTP service. The client sends requests via POST and receives responses via an SSE stream. This is what you want for shared team infrastructure — a central MCP server that multiple engineers connect to from their own Claude Desktop or IDE.

Here is the typical architecture for a data engineering team:

Engineer's Laptop                          Team Infrastructure
┌─────────────────┐                       ┌──────────────────────┐
│  Claude Desktop  │──stdio──▶ Local MCP   │  Shared MCP Servers  │
│  (MCP Host)      │         (dev DB)      │                      │
│                  │                       │  ┌─────────────────┐ │
│                  │──SSE──────────────────▶│  │ DB Query Server │ │
│                  │                       │  └────────┬────────┘ │
│                  │──SSE──────────────────▶│  ┌───────▼────────┐ │
│                  │                       │  │ dbt Catalog     │ │
│                  │                       │  └────────┬────────┘ │
│                  │──SSE──────────────────▶│  ┌───────▼────────┐ │
│                  │                       │  │ Airflow Monitor │ │
│                  │                       │  └─────────────────┘ │
└─────────────────┘                       └──────────────────────┘

The critical detail: each MCP server is independent. The DB query server knows nothing about Airflow. The Airflow server knows nothing about dbt. But Claude can correlate information across all three in a single conversation. "Which tables are affected by the DAG that failed at 3am, and do any of them have data quality alerts?" — that is a question that requires all three servers, and MCP handles the orchestration.

Building Your First MCP Server: Database Query Tool

Let us build a real MCP server that lets Claude query your PostgreSQL database. This is the single highest-value MCP server you can build as a data engineer. Install the dependencies first:

pip install mcp asyncpg

Here is the complete server:

"""MCP server for read-only database queries."""
import asyncio
import json
from contextlib import asynccontextmanager

import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "analytics_prod",
    "user": "mcp_readonly",
    "password": "your-readonly-password",  # Use env vars in production
}

QUERY_TIMEOUT_SECONDS = 30
MAX_ROWS = 500

server = Server("database-query")
pool: asyncpg.Pool | None = None


async def get_pool() -> asyncpg.Pool:
    global pool
    if pool is None:
        pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=2,
            max_size=5,
            command_timeout=QUERY_TIMEOUT_SECONDS,
        )
    return pool


def format_results(columns: list[str], rows: list[asyncpg.Record]) -> str:
    """Format query results as a readable table."""
    if not rows:
        return "Query returned 0 rows."

    col_widths = [len(c) for c in columns]
    for row in rows[:MAX_ROWS]:
        for i, val in enumerate(row):
            col_widths[i] = max(col_widths[i], len(str(val)))

    header = " | ".join(c.ljust(w) for c, w in zip(columns, col_widths))
    separator = "-+-".join("-" * w for w in col_widths)
    data_rows = []
    for row in rows[:MAX_ROWS]:
        data_rows.append(
            " | ".join(str(v).ljust(w) for v, w in zip(row, col_widths))
        )

    result = f"{header}\n{separator}\n" + "\n".join(data_rows)
    if len(rows) > MAX_ROWS:
        result += f"\n\n... truncated ({len(rows)} total rows, showing {MAX_ROWS})"
    return result


@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="run_query",
            description=(
                "Execute a read-only SQL query against the analytics database. "
                "Returns up to 500 rows. Use for exploring data, checking schemas, "
                "and investigating data quality issues."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL SELECT query to execute. Only SELECT is allowed.",
                    }
                },
                "required": ["sql"],
            },
        ),
        Tool(
            name="list_tables",
            description="List all tables in the database with row counts and column counts.",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="describe_table",
            description="Get column names, types, and sample values for a specific table.",
            inputSchema={
                "type": "object",
                "properties": {
                    "table_name": {
                        "type": "string",
                        "description": "Fully qualified table name (schema.table)",
                    }
                },
                "required": ["table_name"],
            },
        ),
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    db = await get_pool()

    if name == "run_query":
        sql = arguments["sql"].strip()

        # Security: block non-SELECT statements
        first_word = sql.split()[0].upper() if sql else ""
        if first_word not in ("SELECT", "WITH", "EXPLAIN"):
            return [TextContent(
                type="text",
                text="Error: Only SELECT, WITH, and EXPLAIN queries are allowed."
            )]

        try:
            async with db.acquire() as conn:
                # Use a read-only transaction for extra safety
                async with conn.transaction(readonly=True):
                    stmt = await conn.prepare(sql)
                    columns = [col.name for col in stmt.get_attributes()]
                    rows = await stmt.fetch(MAX_ROWS + 1)

            return [TextContent(
                type="text",
                text=format_results(columns, rows),
            )]
        except asyncpg.PostgresError as e:
            return [TextContent(type="text", text=f"Query error: {e}")]

    elif name == "list_tables":
        async with db.acquire() as conn:
            rows = await conn.fetch("""
                SELECT schemaname || '.' || tablename AS table_name,
                       n_live_tup AS approx_rows
                FROM pg_stat_user_tables
                ORDER BY n_live_tup DESC
            """)
        lines = [f"{r['table_name']:50s} ~{r['approx_rows']:>12,} rows" for r in rows]
        return [TextContent(type="text", text="\n".join(lines) or "No tables found.")]

    elif name == "describe_table":
        table = arguments["table_name"]
        parts = table.split(".")
        if len(parts) != 2:
            return [TextContent(type="text", text="Use schema.table format (e.g., public.orders)")]

        schema_name, table_name = parts
        async with db.acquire() as conn:
            columns = await conn.fetch("""
                SELECT column_name, data_type, is_nullable,
                       column_default
                FROM information_schema.columns
                WHERE table_schema = $1 AND table_name = $2
                ORDER BY ordinal_position
            """, schema_name, table_name)

        if not columns:
            return [TextContent(type="text", text=f"Table {table} not found.")]

        lines = [f"{'Column':30s} {'Type':20s} {'Nullable':10s} Default"]
        lines.append("-" * 80)
        for col in columns:
            lines.append(
                f"{col['column_name']:30s} {col['data_type']:20s} "
                f"{col['is_nullable']:10s} {col['column_default'] or ''}"
            )
        return [TextContent(type="text", text="\n".join(lines))]


async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())


if __name__ == "__main__":
    asyncio.run(main())

To register this with Claude Desktop, add it to your claude_desktop_config.json:

{
  "mcpServers": {
    "analytics-db": {
      "command": "python",
      "args": ["/path/to/db_mcp_server.py"],
      "env": {
        "DB_PASSWORD": "your-readonly-password"
      }
    }
  }
}

Restart Claude Desktop and you can immediately ask: "What are the top 10 tables by row count?" or "Show me the schema of the orders table and the last 5 rows." Claude will invoke the tools automatically.

MCP Server for dbt Catalog and Data Lineage

The database query server is useful, but it is like giving Claude a SQL console without any documentation. The next server gives Claude access to your dbt project metadata — model descriptions, column documentation, tests, and lineage. This is where things get genuinely powerful.

"""MCP server for dbt project metadata and lineage."""
import json
import asyncio
from pathlib import Path

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource

DBT_PROJECT_DIR = Path("/opt/dbt/analytics")
MANIFEST_PATH = DBT_PROJECT_DIR / "target" / "manifest.json"
CATALOG_PATH = DBT_PROJECT_DIR / "target" / "catalog.json"

server = Server("dbt-catalog")
_manifest: dict | None = None
_catalog: dict | None = None


def load_manifest() -> dict:
    global _manifest
    if _manifest is None:
        _manifest = json.loads(MANIFEST_PATH.read_text())
    return _manifest


def load_catalog() -> dict:
    global _catalog
    if _catalog is None and CATALOG_PATH.exists():
        _catalog = json.loads(CATALOG_PATH.read_text())
    return _catalog or {}


@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="search_models",
            description="Search dbt models by name or description keyword.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search keyword"}
                },
                "required": ["query"],
            },
        ),
        Tool(
            name="get_model_details",
            description=(
                "Get full details for a dbt model: description, columns, tests, "
                "dependencies, and SQL."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "model_name": {"type": "string", "description": "dbt model name"}
                },
                "required": ["model_name"],
            },
        ),
        Tool(
            name="get_upstream_lineage",
            description="Get all upstream dependencies of a model (recursive).",
            inputSchema={
                "type": "object",
                "properties": {
                    "model_name": {"type": "string"},
                    "depth": {"type": "integer", "description": "Max depth (default 5)"},
                },
                "required": ["model_name"],
            },
        ),
        Tool(
            name="get_downstream_lineage",
            description="Get all models that depend on this model (recursive).",
            inputSchema={
                "type": "object",
                "properties": {
                    "model_name": {"type": "string"},
                    "depth": {"type": "integer", "description": "Max depth (default 5)"},
                },
                "required": ["model_name"],
            },
        ),
        Tool(
            name="get_test_results",
            description="Get the latest test results for a specific model.",
            inputSchema={
                "type": "object",
                "properties": {
                    "model_name": {"type": "string"}
                },
                "required": ["model_name"],
            },
        ),
    ]


def find_node(model_name: str) -> dict | None:
    manifest = load_manifest()
    for key, node in manifest.get("nodes", {}).items():
        if node.get("name") == model_name and node.get("resource_type") == "model":
            return node
    return None


def get_lineage(model_name: str, direction: str, max_depth: int = 5) -> list[str]:
    """Walk the DAG upstream or downstream."""
    manifest = load_manifest()
    child_map = manifest.get("child_map", {})
    parent_map = manifest.get("parent_map", {})

    node = find_node(model_name)
    if not node:
        return []

    unique_id = node["unique_id"]
    graph = child_map if direction == "downstream" else parent_map
    visited = set()
    result = []

    def walk(uid: str, depth: int):
        if depth > max_depth or uid in visited:
            return
        visited.add(uid)
        for dep_id in graph.get(uid, []):
            dep_node = manifest["nodes"].get(dep_id) or manifest.get("sources", {}).get(dep_id)
            if dep_node:
                prefix = "  " * depth
                result.append(f"{prefix}{dep_node.get('name', dep_id)} ({dep_node.get('resource_type', '?')})")
                walk(dep_id, depth + 1)

    walk(unique_id, 0)
    return result


@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    manifest = load_manifest()

    if name == "search_models":
        query = arguments["query"].lower()
        matches = []
        for key, node in manifest.get("nodes", {}).items():
            if node.get("resource_type") != "model":
                continue
            name_match = query in node.get("name", "").lower()
            desc_match = query in node.get("description", "").lower()
            if name_match or desc_match:
                matches.append(
                    f"{node['name']:40s} {(node.get('description', '')[:80])}"
                )
        return [TextContent(type="text", text="\n".join(matches[:50]) or "No models found.")]

    elif name == "get_model_details":
        node = find_node(arguments["model_name"])
        if not node:
            return [TextContent(type="text", text=f"Model '{arguments['model_name']}' not found.")]

        columns = node.get("columns", {})
        col_lines = []
        for col_name, col_info in columns.items():
            tests = [t.get("test_metadata", {}).get("name", "?")
                     for t in manifest.get("nodes", {}).values()
                     if t.get("resource_type") == "test"
                     and arguments["model_name"] in t.get("depends_on", {}).get("nodes", [])]
            col_lines.append(
                f"  {col_name}: {col_info.get('data_type', 'unknown')} "
                f"- {col_info.get('description', 'no description')}"
            )

        deps = node.get("depends_on", {}).get("nodes", [])
        dep_names = [d.split(".")[-1] for d in deps]

        detail = f"""Model: {node['name']}
Schema: {node.get('schema', '?')}
Materialization: {node.get('config', {}).get('materialized', '?')}
Description: {node.get('description', 'None')}

Columns:
{chr(10).join(col_lines) or '  (no column docs)'}

Dependencies: {', '.join(dep_names) or 'None'}

SQL:
{node.get('raw_code', node.get('raw_sql', 'N/A'))[:2000]}"""

        return [TextContent(type="text", text=detail)]

    elif name == "get_upstream_lineage":
        depth = arguments.get("depth", 5)
        tree = get_lineage(arguments["model_name"], "upstream", depth)
        return [TextContent(
            type="text",
            text=f"Upstream lineage for {arguments['model_name']}:\n" + "\n".join(tree) or "No upstream deps."
        )]

    elif name == "get_downstream_lineage":
        depth = arguments.get("depth", 5)
        tree = get_lineage(arguments["model_name"], "downstream", depth)
        return [TextContent(
            type="text",
            text=f"Downstream lineage for {arguments['model_name']}:\n" + "\n".join(tree) or "No downstream deps."
        )]

    elif name == "get_test_results":
        run_results_path = DBT_PROJECT_DIR / "target" / "run_results.json"
        if not run_results_path.exists():
            return [TextContent(type="text", text="No run_results.json found. Run dbt test first.")]

        results = json.loads(run_results_path.read_text())
        model_tests = []
        for result in results.get("results", []):
            uid = result.get("unique_id", "")
            if arguments["model_name"] in uid and "test" in uid:
                model_tests.append(
                    f"  {uid.split('.')[-1]:50s} {result['status']:10s} "
                    f"{result.get('execution_time', 0):.2f}s"
                )
        return [TextContent(
            type="text",
            text=f"Test results for {arguments['model_name']}:\n" + "\n".join(model_tests) or "No tests found."
        )]


async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())


if __name__ == "__main__":
    asyncio.run(main())

Now you can ask Claude: "What are the upstream dependencies of the fct_revenue model, and do any of them have failing tests?" Claude will call get_upstream_lineage, then get_test_results for each upstream model, and synthesize the answer. That query used to require opening three different browser tabs.

MCP Server for Pipeline Monitoring

The third server connects Claude to your orchestrator. Here is a simplified version that queries the Airflow REST API:

"""MCP server for Airflow pipeline monitoring."""
import asyncio
from datetime import datetime, timedelta

import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

AIRFLOW_BASE_URL = "http://airflow.internal:8080/api/v1"
AIRFLOW_USER = "mcp_viewer"
AIRFLOW_PASS = "viewer-password"  # Use env vars in production

server = Server("airflow-monitor")


async def airflow_get(path: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"{AIRFLOW_BASE_URL}{path}",
            auth=(AIRFLOW_USER, AIRFLOW_PASS),
            timeout=15.0,
        )
        resp.raise_for_status()
        return resp.json()


@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="list_dags",
            description="List all active DAGs with their schedule and last run status.",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="get_dag_runs",
            description="Get recent runs for a specific DAG with status and duration.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dag_id": {"type": "string"},
                    "limit": {"type": "integer", "description": "Number of runs (default 10)"},
                },
                "required": ["dag_id"],
            },
        ),
        Tool(
            name="get_failed_tasks",
            description="Get failed task instances for a specific DAG run.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dag_id": {"type": "string"},
                    "dag_run_id": {"type": "string"},
                },
                "required": ["dag_id", "dag_run_id"],
            },
        ),
        Tool(
            name="get_task_logs",
            description="Get logs for a specific task instance (last 100 lines).",
            inputSchema={
                "type": "object",
                "properties": {
                    "dag_id": {"type": "string"},
                    "dag_run_id": {"type": "string"},
                    "task_id": {"type": "string"},
                },
                "required": ["dag_id", "dag_run_id", "task_id"],
            },
        ),
        Tool(
            name="check_data_freshness",
            description=(
                "Check when key tables were last updated by cross-referencing "
                "DAG run times with table metadata."
            ),
            inputSchema={"type": "object", "properties": {}},
        ),
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "list_dags":
        data = await airflow_get("/dags?only_active=true&limit=100")
        lines = [f"{'DAG ID':45s} {'Schedule':20s} {'Status':10s} Last Run"]
        lines.append("-" * 100)
        for dag in data.get("dags", []):
            last_parsed = dag.get("last_parsed_time", "never")[:19]
            lines.append(
                f"{dag['dag_id']:45s} "
                f"{(dag.get('schedule_interval') or 'none'):20s} "
                f"{'paused' if dag.get('is_paused') else 'active':10s} "
                f"{last_parsed}"
            )
        return [TextContent(type="text", text="\n".join(lines))]

    elif name == "get_dag_runs":
        limit = arguments.get("limit", 10)
        data = await airflow_get(
            f"/dags/{arguments['dag_id']}/dagRuns"
            f"?order_by=-start_date&limit={limit}"
        )
        lines = [f"{'Run ID':40s} {'State':12s} {'Start':20s} Duration"]
        lines.append("-" * 90)
        for run in data.get("dag_runs", []):
            start = (run.get("start_date") or "")[:19]
            end = run.get("end_date")
            duration = ""
            if start and end:
                try:
                    s = datetime.fromisoformat(run["start_date"][:19])
                    e = datetime.fromisoformat(end[:19])
                    duration = str(e - s)
                except ValueError:
                    pass
            lines.append(
                f"{run['dag_run_id']:40s} {run['state']:12s} {start:20s} {duration}"
            )
        return [TextContent(type="text", text="\n".join(lines))]

    elif name == "get_failed_tasks":
        data = await airflow_get(
            f"/dags/{arguments['dag_id']}/dagRuns/{arguments['dag_run_id']}"
            f"/taskInstances?state=failed"
        )
        tasks = data.get("task_instances", [])
        if not tasks:
            return [TextContent(type="text", text="No failed tasks in this run.")]

        lines = []
        for task in tasks:
            lines.append(
                f"Task: {task['task_id']}\n"
                f"  State: {task['state']}\n"
                f"  Start: {(task.get('start_date') or 'N/A')[:19]}\n"
                f"  End: {(task.get('end_date') or 'N/A')[:19]}\n"
                f"  Try: {task.get('try_number', '?')}\n"
            )
        return [TextContent(type="text", text="\n".join(lines))]

    elif name == "get_task_logs":
        data = await airflow_get(
            f"/dags/{arguments['dag_id']}/dagRuns/{arguments['dag_run_id']}"
            f"/taskInstances/{arguments['task_id']}/logs/1"
        )
        # Airflow returns log content as a string
        log_content = str(data)[-5000:]  # Last ~5000 chars
        return [TextContent(type="text", text=f"Task logs (tail):\n{log_content}")]

    elif name == "check_data_freshness":
        # Get all DAGs and their last successful runs
        data = await airflow_get("/dags?only_active=true&limit=100")
        lines = [f"{'DAG ID':45s} {'Last Success':20s} {'Freshness':15s}"]
        lines.append("-" * 85)

        now = datetime.utcnow()
        for dag in data.get("dags", []):
            dag_id = dag["dag_id"]
            runs = await airflow_get(
                f"/dags/{dag_id}/dagRuns?state=success&order_by=-end_date&limit=1"
            )
            dag_runs = runs.get("dag_runs", [])
            if dag_runs:
                end = dag_runs[0].get("end_date", "")[:19]
                try:
                    last = datetime.fromisoformat(end)
                    age = now - last
                    freshness = f"{age.total_seconds() / 3600:.1f}h ago"
                    if age > timedelta(hours=24):
                        freshness += " STALE"
                except ValueError:
                    freshness = "parse error"
            else:
                end = "never"
                freshness = "NEVER RUN"

            lines.append(f"{dag_id:45s} {end:20s} {freshness}")

        return [TextContent(type="text", text="\n".join(lines))]


async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())


if __name__ == "__main__":
    asyncio.run(main())

The check_data_freshness tool is my favorite. I configured a Claude prompt that runs every morning: "Check data freshness for all production DAGs and flag anything more than 24 hours stale." It replaced a custom Slack bot that took two days to build and constantly broke.

MCP vs Function Calling vs REST APIs

This is the question I get most often: "Why not just use OpenAI function calling? Or build a REST API and paste the results into the prompt?" The answer depends on what you are optimizing for.

Aspect MCP Function Calling REST API + Manual
Provider lock-in None (open protocol) High (OpenAI/Anthropic specific) None
Discovery Automatic (server declares tools) Manual (you define schemas) Manual (copy-paste)
Multi-tool orchestration Built-in (host manages N servers) DIY (you write the loop) DIY
Stateful sessions Yes (persistent connection) No (stateless per call) No
Transport stdio + SSE (HTTP coming) HTTP only HTTP
Auth / credential mgmt Server-side (user never sees creds) App-side App-side
IDE integration Native (Cursor, Claude Desktop, VS Code) Limited None
Ecosystem Growing fast (1000+ community servers) Mature Universal
Best for Interactive exploration, multi-step workflows Single-purpose agents, production apps Simple one-shot integrations

My rule of thumb: use MCP when a human is in the loop (engineer asking questions, exploring data, debugging). Use function calling when an agent runs autonomously (automated data quality checks, pipeline triggers). Use REST APIs when you do not need an LLM at all and are just building normal software.

MCP and function calling are not mutually exclusive. Several teams I know wrap their MCP servers in a function-calling layer for production agents, getting the best of both worlds — the MCP server handles the complexity of connecting to infrastructure, and the function calling layer handles the agent orchestration.

Security: The Part Nobody Talks About

Giving an LLM access to your production database should terrify you a little. Here are the non-negotiable security practices I follow:

Read-Only Database Users

Every MCP database server connects with a dedicated read-only user. Not a user with SELECT privileges on everything — a user with SELECT privileges on specific schemas.

-- Create a dedicated MCP user
CREATE USER mcp_readonly WITH PASSWORD 'strong-random-password';

-- Grant access to analytics schema only (not raw PII data)
GRANT USAGE ON SCHEMA analytics TO mcp_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO mcp_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
    GRANT SELECT ON TABLES TO mcp_readonly;

-- Explicitly deny access to sensitive schemas
REVOKE ALL ON SCHEMA pii_data FROM mcp_readonly;
REVOKE ALL ON SCHEMA billing FROM mcp_readonly;

Query Sandboxing

The read-only transaction in our server code is the first layer. Add statement timeouts and row limits as defense in depth:

# In your MCP server, enforce limits at the connection level
pool = await asyncpg.create_pool(
    **DB_CONFIG,
    command_timeout=30,          # Kill queries after 30 seconds
    server_settings={
        "statement_timeout": "30000",      # 30s at PostgreSQL level
        "lock_timeout": "5000",            # 5s lock wait max
        "work_mem": "64MB",                # Limit per-query memory
        "max_parallel_workers_per_gather": "0",  # No parallel scans
    },
)

Credential Management

Never hardcode credentials in MCP server code. Use environment variables passed through the Claude Desktop config or a secrets manager:

{
  "mcpServers": {
    "analytics-db": {
      "command": "python",
      "args": ["/opt/mcp-servers/db_server.py"],
      "env": {
        "DB_HOST": "analytics-replica.internal",
        "DB_USER": "mcp_readonly",
        "DB_PASSWORD_FILE": "/run/secrets/mcp_db_password"
      }
    }
  }
}

Audit Logging

Log every query that goes through your MCP server. When the VP of Engineering asks "what has the AI been querying?", you want an answer:

import logging
import time

logger = logging.getLogger("mcp.audit")

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    start = time.time()
    logger.info(f"MCP tool call: {name} | args: {json.dumps(arguments)}")

    try:
        result = await _execute_tool(name, arguments)
        elapsed = time.time() - start
        logger.info(f"MCP tool success: {name} | {elapsed:.2f}s")
        return result
    except Exception as e:
        logger.error(f"MCP tool error: {name} | {e}")
        raise

Real Use Cases That Justified the Investment

After running MCP servers in our team for three months, here are the workflows that delivered the most value:

"What happened to the revenue dashboard?"

A stakeholder reports that the revenue dashboard shows stale numbers. Before MCP, this investigation involved: check Airflow UI for DAG failures, check the database for table freshness, check dbt run logs for errors, check the data quality alerts channel. With MCP, I open Claude and type: "The revenue dashboard looks stale. Check the freshness of all DAGs related to revenue, then check if any upstream tables have schema changes in the last 24 hours." Claude calls the Airflow server, then the database server, then the dbt catalog, and returns: "The raw_stripe_payments source table added a new column payment_method_v2 at 2am. The stg_payments model still references the old payment_method column, which now returns NULL for new records. The load_stripe_data DAG ran successfully — the issue is in the transformation layer." Total time: 90 seconds instead of 40 minutes.

"Is this table safe to deprecate?"

Before dropping a table, I ask Claude: "What downstream models depend on dim_products_v2, and which dashboards query it?" The dbt catalog server provides the model lineage. The database server shows recent query patterns from pg_stat_statements. Instead of sending a Slack message and waiting three days for responses, I get a comprehensive impact analysis in under a minute.

"Help me debug this data quality issue"

I paste a data quality alert into Claude: "The not_null test on fct_orders.customer_id is failing with 847 violations." Claude queries the actual failing rows, checks when the issue started by comparing recent partitions, looks up the upstream model lineage, and identifies that a LEFT JOIN was changed to an INNER JOIN in a commit three days ago but the dependent models were not backfilled.

When NOT to Use MCP

MCP is not the right tool for everything. Be honest with yourself about these limitations:

  • High-throughput production agents — If you are building an automated system that processes thousands of requests per hour, the MCP protocol overhead (JSON-RPC, session management) adds unnecessary latency. Use function calling with direct API integrations instead.
  • Simple single-tool integrations — If you just need Claude to run a SQL query and nothing else, the mcp SDK is overkill. A simple function calling setup with 20 lines of code will do.
  • Environments without LLM access — If your data infrastructure is in an air-gapped environment where LLMs cannot reach the MCP servers, the entire architecture falls apart. You would need to self-host the LLM first.
  • Write operations — MCP is designed for read-heavy workflows. While you can build MCP tools that write data, the safety implications of an LLM autonomously modifying production data should give you pause. I would limit write operations to staging environments and require explicit human confirmation.

What's Coming: The MCP Roadmap

MCP is evolving fast. Here is what is in progress or actively discussed in the spec:

  • Streamable HTTP transport — Replacing SSE with a more efficient bidirectional HTTP streaming protocol. This will make remote MCP servers easier to deploy behind load balancers and API gateways.
  • OAuth 2.1 authentication — The spec now includes a formal authentication flow, so MCP servers can require proper credentials instead of relying on network-level security. This is critical for enterprise adoption.
  • Elicitation — A mechanism for MCP servers to ask the user for additional input mid-conversation. Instead of failing with "missing parameter," the server can prompt: "Which database environment do you want to query: staging or production?"
  • MCP registries and discovery — Think npm for MCP servers. Teams will publish and share MCP servers through registries, so you can mcp install @company/snowflake-server instead of building from scratch.
  • Structured output negotiation — Servers will be able to declare preferred output formats (tables, charts, markdown), and hosts will render them appropriately.

The authentication spec is the most important near-term development. Right now, most MCP servers assume they are running locally behind a firewall. The OAuth spec will unlock hosted MCP servers that SaaS vendors can offer — imagine Snowflake, Databricks, and Fivetran each publishing official MCP servers that you connect to from Claude Desktop with a single OAuth flow.

Getting Started: A 30-Minute Plan

If you want to start using MCP in your data engineering workflow today, here is the fastest path to value:

  1. Install Claude Desktop and verify MCP support is working (check Settings > Developer > MCP)
  2. Deploy the database query server from this article against a read replica of your analytics database. Use the read-only user setup I described. This takes 15 minutes.
  3. Add the dbt catalog server pointed at your project's target/manifest.json. Run dbt docs generate first to ensure it exists. Another 10 minutes.
  4. Test with a real question — ask Claude something you would normally investigate manually. "What tables in the analytics schema have not been updated in the last 48 hours?" or "Show me the schema of the orders table and explain what each column likely represents based on the dbt docs."
  5. Share with your team — Move the servers to SSE transport on an internal host so everyone can connect.

The total setup time is under 30 minutes for a working prototype. The ROI shows up the first time someone on your team debugs a pipeline issue in 2 minutes instead of 30. MCP is not going to replace your monitoring stack or your data catalog. But it is going to become the fastest interface between your brain and your data infrastructure, and the teams that adopt it early will have a significant productivity advantage.

Leave a Comment