AI Agents Are Replacing ETL Scripts: How We Automated 80% of Our Data Pipeline Maintenance

Last Tuesday at 2:47 AM, our Airflow DAG etl_customer_events failed because an upstream Salesforce API silently added three new fields and changed account_type from a string to an integer. Six months ago, that would have meant a page to the on-call engineer, 45 minutes of groggy debugging, a manual schema migration, a backfill, and a Slack thread with 23 messages. Last Tuesday, our pipeline repair agent detected the failure, read the error logs, identified the schema drift, generated the migration SQL, posted it to Slack for approval, and had the pipeline running again in 3 minutes and 12 seconds. The on-call engineer approved the fix from bed without opening a laptop. That's not a demo. That's production. And it changed how I think about data engineering entirely.

The Maintenance Tax: Why 60% of Data Engineering Is Janitorial Work

I've been a data engineer for eight years, and here's the dirty secret we don't talk about at conferences: the majority of our time isn't spent building elegant pipelines or designing clever architectures. It's maintenance. A 2025 survey by dbt Labs found that data engineers spend 62% of their time on pipeline maintenance -- fixing broken DAGs, handling schema changes, investigating data quality issues, and running backfills. The Fivetran "State of Data Engineering" report put it even higher at 67%.

The breakdown at our company (a Series C fintech, 400 employees, 12-person data team) looked like this before we started using AI agents:

Task CategoryHours/Week (Team Total)% of Total
Schema change handling1822%
Failed job investigation & repair1417%
Data quality incident response1215%
Backfills and reruns67%
New pipeline development1620%
Architecture & design810%
Documentation & communication79%

Over 60% of our time was reactive. We were a team of highly paid firefighters. That's the problem AI agents solve -- not by replacing data engineers, but by handling the predictable, repetitive repair work so we can focus on building things that actually matter.

What "AI Agents for Data" Actually Means (It's Not ChatGPT)

Let me be precise about terminology because the industry has mangled the word "agent" beyond recognition. When I say AI agent for data pipelines, I don't mean a chatbot you ask questions. I don't mean a Copilot that autocompletes your SQL. I mean an autonomous system that:

  1. Observes -- monitors pipeline state, logs, metadata, and metrics continuously
  2. Reasons -- uses an LLM to interpret what went wrong and why
  3. Plans -- determines the correct remediation steps from a set of available tools
  4. Acts -- executes those steps (with appropriate guardrails)
  5. Learns -- logs outcomes so it can improve over time

The key distinction is tool calling. An LLM prompt says "here's an error, what should I do?" and gives you text. An agent says "here's an error" and actually fixes it. The LLM is the brain, but the tools -- database connections, Airflow APIs, Git operations, Jira APIs, Slack webhooks -- are the hands.

This became viable in 2025 when Claude, GPT-4, and Gemini all shipped reliable tool-calling APIs with structured outputs. Before that, getting an LLM to reliably invoke the right tool with the right parameters was a coin flip. Now it works well enough for production.

Architecture: The Agent Framework We Built

We didn't use LangChain. We didn't use CrewAI. We built a minimal framework because we needed to understand every line of code that touches our production databases. Here's the architecture:

"""
Pipeline Agent Framework - Core Architecture

Components:
1. Agent Loop    - Observe → Reason → Plan → Act → Report
2. Tool Registry - Registered functions the agent can call
3. Safety Layer  - Read-only default, approval for writes, sandboxing
4. Audit Logger  - Every action logged to ku_agent_audit table
"""

import json
import time
import logging
from dataclasses import dataclass, field
from typing import Callable, Any
from anthropic import Anthropic

logger = logging.getLogger("pipeline_agent")

@dataclass
class Tool:
    name: str
    description: str
    parameters: dict          # JSON Schema for tool parameters
    function: Callable        # Actual Python function to execute
    requires_approval: bool = False
    read_only: bool = True

@dataclass
class AgentConfig:
    model: str = "claude-sonnet-4-20250514"
    max_iterations: int = 10
    approval_timeout_seconds: int = 300
    sandbox_mode: bool = True
    allowed_databases: list = field(default_factory=lambda: ["analytics_prod"])
    slack_channel: str = "#data-pipeline-agents"

class PipelineAgent:
    def __init__(self, config: AgentConfig, tools: list[Tool]):
        self.client = Anthropic()
        self.config = config
        self.tools = {t.name: t for t in tools}
        self.audit_log = []

    def _build_tool_schemas(self) -> list[dict]:
        """Convert our Tool objects to Anthropic tool-calling format."""
        return [
            {
                "name": t.name,
                "description": t.description,
                "input_schema": t.parameters,
            }
            for t in self.tools.values()
        ]

    def _request_human_approval(self, tool_name: str, params: dict) -> bool:
        """Post to Slack and wait for thumbs-up or timeout."""
        import requests
        payload = {
            "channel": self.config.slack_channel,
            "text": (
                f":robot_face: *Pipeline Agent requests approval*\n"
                f"Tool: `{tool_name}`\n"
                f"Parameters:\n```{json.dumps(params, indent=2)}```\n"
                f"React :white_check_mark: to approve or :x: to deny. "
                f"Auto-denied in {self.config.approval_timeout_seconds}s."
            ),
        }
        resp = requests.post(
            "https://slack.com/api/chat.postMessage",
            headers={"Authorization": f"Bearer {SLACK_BOT_TOKEN}"},
            json=payload,
        )
        message_ts = resp.json()["ts"]

        # Poll for reaction (simplified -- production version uses Slack events API)
        deadline = time.time() + self.config.approval_timeout_seconds
        while time.time() < deadline:
            time.sleep(5)
            reactions = requests.get(
                "https://slack.com/api/reactions.get",
                headers={"Authorization": f"Bearer {SLACK_BOT_TOKEN}"},
                params={"channel": self.config.slack_channel, "timestamp": message_ts},
            ).json()
            for r in reactions.get("message", {}).get("reactions", []):
                if r["name"] == "white_check_mark":
                    return True
                if r["name"] == "x":
                    return False
            time.sleep(10)
        return False  # Timeout = deny

    def run(self, trigger_event: dict) -> dict:
        """Main agent loop: observe, reason, plan, act, report."""
        system_prompt = (
            "You are a data pipeline repair agent. Your job is to diagnose "
            "pipeline failures and fix them using the available tools. "
            "Always start by reading logs and metadata before taking action. "
            "Never modify production data without explicit approval. "
            "Prefer the least disruptive fix. Explain your reasoning."
        )

        messages = [
            {
                "role": "user",
                "content": f"Pipeline event detected:\n{json.dumps(trigger_event, indent=2)}\n\n"
                           f"Investigate this issue and resolve it.",
            }
        ]

        for iteration in range(self.config.max_iterations):
            response = self.client.messages.create(
                model=self.config.model,
                max_tokens=4096,
                system=system_prompt,
                tools=self._build_tool_schemas(),
                messages=messages,
            )

            # If the model wants to use a tool
            if response.stop_reason == "tool_use":
                tool_results = []
                for block in response.content:
                    if block.type == "tool_use":
                        tool_name = block.name
                        tool_input = block.input
                        tool = self.tools[tool_name]

                        # Safety check: require approval for write operations
                        if tool.requires_approval:
                            approved = self._request_human_approval(tool_name, tool_input)
                            if not approved:
                                result = "DENIED: Human did not approve this action."
                                self._log_action(tool_name, tool_input, result, "denied")
                                tool_results.append({
                                    "type": "tool_result",
                                    "tool_use_id": block.id,
                                    "content": result,
                                })
                                continue

                        # Execute the tool
                        try:
                            result = tool.function(**tool_input)
                            self._log_action(tool_name, tool_input, result, "success")
                        except Exception as e:
                            result = f"ERROR: {str(e)}"
                            self._log_action(tool_name, tool_input, result, "error")

                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": str(result),
                        })

                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": tool_results})

            else:
                # Model is done -- extract final summary
                summary = "".join(
                    block.text for block in response.content if hasattr(block, "text")
                )
                return {
                    "status": "resolved",
                    "summary": summary,
                    "actions_taken": self.audit_log,
                    "iterations": iteration + 1,
                }

        return {"status": "max_iterations_reached", "actions_taken": self.audit_log}

    def _log_action(self, tool: str, params: dict, result: Any, status: str):
        entry = {
            "timestamp": time.time(),
            "tool": tool,
            "params": params,
            "result": str(result)[:1000],
            "status": status,
        }
        self.audit_log.append(entry)
        logger.info(f"Agent action: {json.dumps(entry)}")

That's the core -- about 150 lines. The real work is in the tools you register.

Agent #1: Pipeline Repair Agent

This is the agent that handled our 2:47 AM Salesforce incident. It monitors Airflow for failed DAG runs and attempts automated repair. Here are the tools we registered:

import subprocess
import aiomysql

# --- Tool Functions ---

def get_failed_dag_runs(hours_lookback: int = 6) -> str:
    """Query Airflow metadata DB for recent failures."""
    import requests
    resp = requests.get(
        "http://airflow.internal:8080/api/v1/dags/~/dagRuns",
        params={"state": "failed", "limit": 20,
                "execution_date_gte": f"-{hours_lookback}h"},
        auth=("agent_readonly", AIRFLOW_AGENT_PASSWORD),
    )
    runs = resp.json().get("dag_runs", [])
    return json.dumps([
        {"dag_id": r["dag_id"], "run_id": r["run_id"],
         "execution_date": r["execution_date"],
         "end_date": r["end_date"]}
        for r in runs
    ], indent=2)


def get_task_logs(dag_id: str, run_id: str, task_id: str) -> str:
    """Fetch Airflow task logs for a specific failed task."""
    import requests
    resp = requests.get(
        f"http://airflow.internal:8080/api/v1/dags/{dag_id}"
        f"/dagRuns/{run_id}/taskInstances/{task_id}/logs/1",
        auth=("agent_readonly", AIRFLOW_AGENT_PASSWORD),
    )
    # Return last 200 lines to stay within context window
    lines = resp.text.strip().split("\n")
    return "\n".join(lines[-200:])


def query_database_readonly(sql: str, database: str = "analytics_prod") -> str:
    """Execute a read-only SQL query against the analytics database."""
    if not sql.strip().upper().startswith("SELECT"):
        return "ERROR: Only SELECT queries allowed in read-only mode."
    if database not in ["analytics_prod", "analytics_staging"]:
        return "ERROR: Database not in allowed list."
    import pymysql
    conn = pymysql.connect(
        host="db.internal", user="agent_readonly",
        password=DB_AGENT_PASSWORD, database=database,
    )
    try:
        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return json.dumps(rows[:100], indent=2, default=str)
    finally:
        conn.close()


def execute_sql_migration(sql: str, database: str = "analytics_prod") -> str:
    """Execute a DDL/DML statement. REQUIRES HUMAN APPROVAL."""
    import pymysql
    conn = pymysql.connect(
        host="db.internal", user="agent_writer",
        password=DB_AGENT_WRITE_PASSWORD, database=database,
    )
    try:
        with conn.cursor() as cur:
            cur.execute(sql)
            conn.commit()
            return f"OK: {cur.rowcount} rows affected."
    finally:
        conn.close()


def trigger_dag_backfill(dag_id: str, start_date: str, end_date: str) -> str:
    """Trigger an Airflow backfill for a date range. REQUIRES APPROVAL."""
    import requests
    resp = requests.post(
        f"http://airflow.internal:8080/api/v1/dags/{dag_id}/dagRuns",
        json={"execution_date": start_date,
              "conf": {"backfill_end": end_date, "triggered_by": "agent"}},
        auth=("agent_trigger", AIRFLOW_AGENT_PASSWORD),
    )
    return f"Backfill triggered: {resp.json().get('run_id', 'unknown')}"


def clear_failed_task(dag_id: str, run_id: str, task_id: str) -> str:
    """Clear a failed task instance to allow retry. REQUIRES APPROVAL."""
    import requests
    resp = requests.post(
        f"http://airflow.internal:8080/api/v1/dags/{dag_id}"
        f"/dagRuns/{run_id}/taskInstances/{task_id}/clear",
        json={"dry_run": False},
        auth=("agent_trigger", AIRFLOW_AGENT_PASSWORD),
    )
    return f"Task cleared: {resp.status_code}"


# --- Register Tools ---

repair_tools = [
    Tool(
        name="get_failed_dag_runs",
        description="List all failed Airflow DAG runs in the last N hours.",
        parameters={
            "type": "object",
            "properties": {
                "hours_lookback": {"type": "integer", "default": 6,
                                   "description": "How many hours back to search"}
            },
        },
        function=get_failed_dag_runs,
        read_only=True,
    ),
    Tool(
        name="get_task_logs",
        description="Fetch the execution logs for a specific Airflow task instance.",
        parameters={
            "type": "object",
            "properties": {
                "dag_id": {"type": "string"},
                "run_id": {"type": "string"},
                "task_id": {"type": "string"},
            },
            "required": ["dag_id", "run_id", "task_id"],
        },
        function=get_task_logs,
        read_only=True,
    ),
    Tool(
        name="query_database_readonly",
        description="Run a read-only SELECT query against the analytics database.",
        parameters={
            "type": "object",
            "properties": {
                "sql": {"type": "string", "description": "SELECT query to execute"},
                "database": {"type": "string", "default": "analytics_prod"},
            },
            "required": ["sql"],
        },
        function=query_database_readonly,
        read_only=True,
    ),
    Tool(
        name="execute_sql_migration",
        description="Execute a DDL or DML SQL statement (ALTER TABLE, INSERT, etc). Requires human approval.",
        parameters={
            "type": "object",
            "properties": {
                "sql": {"type": "string"},
                "database": {"type": "string", "default": "analytics_prod"},
            },
            "required": ["sql"],
        },
        function=execute_sql_migration,
        requires_approval=True,
        read_only=False,
    ),
    Tool(
        name="trigger_dag_backfill",
        description="Trigger an Airflow DAG backfill for a date range. Requires human approval.",
        parameters={
            "type": "object",
            "properties": {
                "dag_id": {"type": "string"},
                "start_date": {"type": "string", "description": "ISO date"},
                "end_date": {"type": "string", "description": "ISO date"},
            },
            "required": ["dag_id", "start_date", "end_date"],
        },
        function=trigger_dag_backfill,
        requires_approval=True,
        read_only=False,
    ),
    Tool(
        name="clear_failed_task",
        description="Clear a failed Airflow task to allow automatic retry. Requires human approval.",
        parameters={
            "type": "object",
            "properties": {
                "dag_id": {"type": "string"},
                "run_id": {"type": "string"},
                "task_id": {"type": "string"},
            },
            "required": ["dag_id", "run_id", "task_id"],
        },
        function=clear_failed_task,
        requires_approval=True,
        read_only=False,
    ),
]

When a DAG fails, an Airflow callback fires our agent. The agent reads logs, queries the database to understand schema state, figures out the root cause, and proposes a fix. Write operations always go through Slack approval. In three months of production usage, the agent correctly diagnosed 84% of failures on the first attempt.

Agent #2: Schema Migration Agent

Schema drift is the number one cause of pipeline failures in our stack. Upstream sources -- Salesforce, Stripe, a microservice PostgreSQL database, three vendor APIs -- change their schemas without warning. Our schema migration agent watches for these changes and auto-generates dbt model updates.

"""
Schema Migration Agent

Monitors source schemas for drift and auto-generates dbt model updates.
Runs every 15 minutes via Airflow sensor DAG.
"""

def detect_schema_changes(source_name: str) -> str:
    """Compare current source schema to the last known snapshot.
    Returns a JSON diff of added, removed, and modified columns."""
    import pymysql

    conn = pymysql.connect(
        host="db.internal", user="agent_readonly",
        password=DB_AGENT_PASSWORD, database="analytics_prod",
    )
    try:
        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            # Get current schema from information_schema
            cur.execute("""
                SELECT column_name, data_type, is_nullable, column_default
                FROM information_schema.columns
                WHERE table_schema = %s AND table_name = %s
                ORDER BY ordinal_position
            """, (source_name.split(".")[0], source_name.split(".")[1]))
            current = {r["column_name"]: r for r in cur.fetchall()}

            # Get last snapshot from our schema registry
            cur.execute("""
                SELECT column_name, data_type, is_nullable, column_default
                FROM schema_registry.column_snapshots
                WHERE source_name = %s AND snapshot_date = (
                    SELECT MAX(snapshot_date) FROM schema_registry.column_snapshots
                    WHERE source_name = %s
                )
            """, (source_name, source_name))
            previous = {r["column_name"]: r for r in cur.fetchall()}

        added = [c for c in current if c not in previous]
        removed = [c for c in previous if c not in current]
        type_changed = [
            c for c in current
            if c in previous and current[c]["data_type"] != previous[c]["data_type"]
        ]

        return json.dumps({
            "source": source_name,
            "added_columns": added,
            "removed_columns": removed,
            "type_changes": [
                {"column": c,
                 "old_type": previous[c]["data_type"],
                 "new_type": current[c]["data_type"]}
                for c in type_changed
            ],
            "has_breaking_changes": bool(removed or type_changed),
        }, indent=2)
    finally:
        conn.close()


def read_dbt_model(model_path: str) -> str:
    """Read a dbt model SQL file from the repo."""
    import os
    base = "/opt/dbt/analytics"
    full_path = os.path.normpath(os.path.join(base, model_path))
    if not full_path.startswith(base):
        return "ERROR: Path traversal detected."
    with open(full_path) as f:
        return f.read()


def read_dbt_schema_yaml(model_path: str) -> str:
    """Read the schema.yml file for a dbt model directory."""
    import os
    base = "/opt/dbt/analytics"
    dir_path = os.path.dirname(os.path.join(base, model_path))
    yaml_path = os.path.join(dir_path, "schema.yml")
    if not yaml_path.startswith(base):
        return "ERROR: Path traversal detected."
    with open(yaml_path) as f:
        return f.read()


def create_git_branch_and_pr(
    branch_name: str,
    file_changes: list[dict],
    pr_title: str,
    pr_body: str,
) -> str:
    """Create a Git branch with file changes and open a PR. REQUIRES APPROVAL."""
    import subprocess, tempfile, os

    repo_dir = "/opt/dbt/analytics"
    subprocess.run(["git", "checkout", "main"], cwd=repo_dir, check=True)
    subprocess.run(["git", "pull", "origin", "main"], cwd=repo_dir, check=True)
    subprocess.run(["git", "checkout", "-b", branch_name], cwd=repo_dir, check=True)

    for change in file_changes:
        file_path = os.path.normpath(os.path.join(repo_dir, change["path"]))
        if not file_path.startswith(repo_dir):
            return "ERROR: Path traversal detected."
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, "w") as f:
            f.write(change["content"])
        subprocess.run(["git", "add", file_path], cwd=repo_dir, check=True)

    subprocess.run(
        ["git", "commit", "-m", f"[agent] {pr_title}"], cwd=repo_dir, check=True
    )
    subprocess.run(
        ["git", "push", "origin", branch_name], cwd=repo_dir, check=True
    )

    result = subprocess.run(
        ["gh", "pr", "create", "--title", pr_title, "--body", pr_body,
         "--base", "main", "--head", branch_name],
        cwd=repo_dir, capture_output=True, text=True, check=True,
    )
    return f"PR created: {result.stdout.strip()}"

The agent detects new columns and adds them to the staging model. It detects type changes and generates CAST() expressions with appropriate coercion. It detects removed columns and comments them out with a deprecation notice. Every change goes through a Pull Request that a human reviews. The agent even runs dbt build --select on the modified model in our staging environment and includes the test results in the PR description.

In the first 90 days, the schema migration agent handled 47 schema changes. Of those, 41 were auto-merged after review (87% acceptance rate). The remaining 6 required manual tweaks -- usually because the business logic around a renamed column was ambiguous.

Agent #3: Data Quality Agent

Our third agent monitors data quality metrics and creates Jira tickets with full context when anomalies are detected. This one doesn't fix problems -- it investigates and documents them, which is honestly where most engineering time goes.

"""
Data Quality Agent

Monitors Great Expectations / Soda results, investigates anomalies,
and creates context-rich Jira tickets.
"""

def get_quality_check_failures(hours_lookback: int = 24) -> str:
    """Fetch recent data quality check failures from Soda Cloud API."""
    import requests
    resp = requests.get(
        "https://cloud.soda.io/api/v1/checks",
        headers={"Authorization": f"Bearer {SODA_API_TOKEN}"},
        params={"status": "fail", "since": f"-{hours_lookback}h"},
    )
    checks = resp.json().get("checks", [])
    return json.dumps([
        {"check_id": c["id"], "dataset": c["dataset"],
         "check_type": c["type"], "value": c["diagnostics"]["value"],
         "threshold": c["diagnostics"]["threshold"],
         "failed_at": c["evaluated_at"]}
        for c in checks
    ], indent=2)


def get_column_statistics(table: str, column: str, days: int = 30) -> str:
    """Fetch historical column-level statistics for anomaly context."""
    import pymysql
    conn = pymysql.connect(
        host="db.internal", user="agent_readonly",
        password=DB_AGENT_PASSWORD, database="analytics_prod",
    )
    try:
        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            cur.execute(f"""
                SELECT
                    DATE(created_at) AS dt,
                    COUNT(*) AS row_count,
                    COUNT(DISTINCT `{column}`) AS distinct_count,
                    SUM(CASE WHEN `{column}` IS NULL THEN 1 ELSE 0 END) AS null_count
                FROM `{table}`
                WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)
                GROUP BY DATE(created_at)
                ORDER BY dt DESC
            """, (days,))
            return json.dumps(cur.fetchall(), indent=2, default=str)
    finally:
        conn.close()


def get_upstream_pipeline_status(dataset: str) -> str:
    """Check the status of the Airflow DAG that populates this dataset."""
    import requests
    # Lookup DAG from our lineage metadata
    resp = requests.get(
        f"http://lineage.internal/api/v1/datasets/{dataset}/upstream",
    )
    lineage = resp.json()
    dag_id = lineage.get("producing_dag_id")
    if not dag_id:
        return json.dumps({"error": "No upstream DAG found in lineage metadata"})

    resp = requests.get(
        f"http://airflow.internal:8080/api/v1/dags/{dag_id}/dagRuns",
        params={"limit": 5, "order_by": "-execution_date"},
        auth=("agent_readonly", AIRFLOW_AGENT_PASSWORD),
    )
    return json.dumps(resp.json().get("dag_runs", [])[:5], indent=2, default=str)


def create_jira_ticket(
    project: str,
    title: str,
    description: str,
    priority: str = "Medium",
    labels: list[str] = None,
) -> str:
    """Create a Jira ticket with investigation context. REQUIRES APPROVAL."""
    import requests
    resp = requests.post(
        "https://company.atlassian.net/rest/api/3/issue",
        headers={
            "Authorization": f"Basic {JIRA_API_TOKEN}",
            "Content-Type": "application/json",
        },
        json={
            "fields": {
                "project": {"key": project},
                "summary": title,
                "description": {
                    "type": "doc", "version": 1,
                    "content": [{"type": "paragraph",
                                 "content": [{"type": "text", "text": description}]}],
                },
                "priority": {"name": priority},
                "labels": labels or ["data-quality", "agent-created"],
                "issuetype": {"name": "Bug"},
            }
        },
    )
    issue = resp.json()
    return f"Created: {issue['key']} - {issue['self']}"

Here's what makes this agent valuable: it doesn't just say "null rate spiked." It investigates. It pulls 30 days of historical stats to show the trend. It checks upstream pipeline status to see if the issue is ingestion-related. It queries the actual data to find example rows. Then it creates a Jira ticket that looks like a senior engineer wrote it:

[DATA-QUALITY] customer_events.account_type null rate spiked to 23% (normal: 0.2%)

Observed: Soda check null_pct(account_type) < 1% failed at 2026-03-12 03:15 UTC. Current null rate: 23.4% (12,847 / 54,892 rows).

History: Null rate was stable at 0.1-0.3% for the past 30 days. Spike began on 2026-03-12 batch at 02:00 UTC.

Root cause hypothesis: Upstream Salesforce sync DAG sync_salesforce_accounts completed successfully at 01:45 UTC, but the source schema changed -- account_type was renamed to account_category. The old column now returns NULL.

Suggested fix: Update staging model stg_salesforce__accounts.sql to map account_category to account_type. Backfill from 2026-03-12.

Impact: 3 downstream models affected: dim_customers, fct_revenue, rpt_churn_analysis.

That Jira ticket used to take 30-45 minutes to create manually. The agent does it in under 60 seconds.

Safety: The Part Nobody Wants to Talk About

An autonomous system with database access is terrifying if you don't get safety right. Here's our framework:

Principle 1: Read-Only by Default

Every tool is read-only unless explicitly marked otherwise. The agent's database credential is a read-only user. Write operations use a separate credential and always require human approval via Slack.

Principle 2: Sandboxing

In sandbox mode (always on for the first 30 days of any new agent), write tools are replaced with mock versions that log what would have happened without executing. We review these mock logs daily to validate agent judgment before enabling real execution.

Principle 3: Blast Radius Limits

SAFETY_LIMITS = {
    "max_rows_affected": 100_000,
    "max_tables_modified": 3,
    "blocked_operations": ["DROP", "TRUNCATE", "DELETE FROM (without WHERE)"],
    "allowed_databases": ["analytics_prod", "analytics_staging"],
    "blocked_tables": ["billing_transactions", "user_pii", "audit_logs"],
    "max_backfill_days": 7,
    "require_approval_for": [
        "execute_sql_migration",
        "trigger_dag_backfill",
        "clear_failed_task",
        "create_git_branch_and_pr",
        "create_jira_ticket",
    ],
}

def validate_sql_safety(sql: str) -> tuple[bool, str]:
    """Check SQL against safety constraints before execution."""
    sql_upper = sql.upper().strip()

    for blocked in SAFETY_LIMITS["blocked_operations"]:
        if blocked in sql_upper:
            return False, f"Blocked operation: {blocked}"

    for table in SAFETY_LIMITS["blocked_tables"]:
        if table.upper() in sql_upper:
            return False, f"Blocked table: {table}"

    return True, "OK"

Principle 4: Full Audit Trail

Every single agent action -- every tool call, every parameter, every result -- is logged to a persistent audit table. We review agent activity in our weekly team meeting. In three months, we've caught two cases where the agent proposed a fix that was technically correct but would have caused a subtle downstream issue. The approval step caught both.

Principle 5: Kill Switch

A single Slack command (/agent-stop) immediately halts all agent execution. We've used it twice -- both times out of caution, not because anything went wrong.

Manual vs Scripted vs Agent-Based Maintenance: The Comparison

Dimension Manual (Engineer) Scripted (Runbooks) Agent-Based (LLM + Tools)
Response time 15-60 min (depends on availability) 1-5 min (if script exists) 30s - 3 min (autonomous)
Novel problem handling Excellent None (fails to unknown) Good for common variants
Coverage Depends on team size Only pre-scripted scenarios Broad (reasons over any error)
Consistency Variable (fatigue, context) Perfect (deterministic) High (same reasoning each time)
Cost per incident $75-$250 (engineer time) $2-$5 (compute) $0.50-$3.00 (API + compute)
Setup cost Training + documentation High (script per scenario) Medium (framework + tools once)
Maintenance Ongoing tribal knowledge Script rot is real Low (LLM adapts to changes)
Trust/auditability High (human judgment) High (deterministic) Medium (requires approval layer)
3 AM availability Poor (humans sleep) Good (if failure matches script) Excellent (always on)

The key insight: agents don't replace scripts OR engineers. They fill the massive gap between "we wrote a runbook for this" and "someone needs to think about this." Most pipeline failures fall in that gap -- they're not novel enough to need a senior engineer's creativity, but they're too varied for a static script.

Cost Analysis: Is It Worth It?

Here are our real numbers after 90 days of running all three agents in production.

Agent API Costs

AgentAvg Invocations/MonthAvg Tokens/InvocationMonthly API Cost
Pipeline Repair47~18,000$38
Schema Migration23~25,000$26
Data Quality156~12,000$84
Total226$148/month

Engineer Time Saved

MetricBefore AgentsAfter AgentsChange
Mean incident response time45 min3 min (agent) + 2 min (approval)-89%
Weekly maintenance hours (team)50 hrs11 hrs-78%
On-call pages/month346-82%
Mean time to resolution (MTTR)2.3 hrs18 min-87%
Schema migration PRs created0 (manual)15/month (auto)N/A

At an average fully-loaded cost of $95/hour for a data engineer, saving 39 hours per week is worth $3,705/week, or roughly $16,000/month. We're spending $148/month on API costs. That's a 108x return. Even if you add the 160 hours we spent building the framework (one-time cost of ~$15,200), the payback period was under one month.

What Agents Can't Do Yet

I'd be dishonest if I painted this as a silver bullet. Here's where our agents fail or where we don't trust them:

  • Complex business logic decisions. When a schema change is ambiguous -- like a field being renamed versus replaced -- the agent guesses wrong about 30% of the time. It handles account_type becoming an integer fine, but status being split into billing_status and account_status requires human judgment about which downstream models need which field.
  • Cross-team coordination. If a pipeline failure is caused by an upstream team deploying a breaking change, the agent can diagnose it but can't have the conversation with that team about fixing their contract. That's still a human job.
  • Truly novel failures. When we migrated from Redshift to Snowflake, the failure modes were completely new. The agent had no useful context for Snowflake-specific issues until we added Snowflake-specific tools and gave it documentation in the system prompt.
  • Performance optimization. The agent can fix a broken query, but it can't look at a slow query and redesign the data model. Optimization requires understanding usage patterns, cost constraints, and future roadmap -- context that doesn't fit in a tool call.
  • Security-sensitive operations. We deliberately exclude PII tables, billing data, and audit logs from agent access. Some things should never be automated, no matter how good the safety layer is.

Getting Started: A Pragmatic Roadmap

If you want to build this at your company, here's the order I'd recommend:

Week 1-2: Start with Read-Only Investigation

Build an agent that can only read logs and query databases. Point it at your last 10 incidents and see if it correctly diagnoses the root cause. Don't give it any write tools yet. This validates the LLM's reasoning ability on your specific infrastructure.

Week 3-4: Add Sandbox Write Tools

Add write tools in sandbox mode. The agent proposes actions but doesn't execute them. Compare its proposals to what your engineers actually did. Track accuracy.

Month 2: Enable Approved Writes

Turn on real execution with human approval for every write. Start with the lowest-risk operations: clearing failed tasks for retry, triggering backfills for known-good date ranges.

Month 3: Expand Scope

Add schema migration and data quality agents. By now you'll have enough audit data to understand the agent's strengths and weaknesses in your environment.

"""
Quick Start: Minimal pipeline investigation agent.
This is the simplest possible agent you can run today.
"""

from anthropic import Anthropic
import json, subprocess

client = Anthropic()

def check_airflow_status():
    """Minimal tool: check Airflow DAG status via CLI."""
    result = subprocess.run(
        ["airflow", "dags", "list-runs", "-d", "etl_main", "-S", "2026-03-01",
         "--state", "failed", "-o", "json"],
        capture_output=True, text=True,
    )
    return result.stdout

def read_log_file(path: str):
    """Minimal tool: read a log file."""
    with open(path) as f:
        lines = f.readlines()
    return "".join(lines[-100:])  # Last 100 lines

tools = [
    {
        "name": "check_airflow_status",
        "description": "List recent failed Airflow DAG runs",
        "input_schema": {"type": "object", "properties": {}},
    },
    {
        "name": "read_log_file",
        "description": "Read the last 100 lines of a log file",
        "input_schema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"],
        },
    },
]

tool_functions = {
    "check_airflow_status": check_airflow_status,
    "read_log_file": read_log_file,
}

messages = [{"role": "user", "content": "Check for failed pipelines and diagnose issues."}]

# Simple agent loop
for _ in range(5):
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        system="You are a pipeline investigation agent. Read logs and diagnose failures.",
        tools=tools,
        messages=messages,
    )
    if response.stop_reason == "tool_use":
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = tool_functions[block.name](
                    **block.input
                ) if block.input else tool_functions[block.name]()
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": str(result),
                })
        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})
    else:
        # Agent is done
        print("".join(b.text for b in response.content if hasattr(b, "text")))
        break

That's 60 lines. You can have a working investigation agent running against your infrastructure this afternoon. Start there, build confidence, then add write capabilities gradually.

What Changed for Our Team

The numbers are compelling, but the cultural shift matters more. Before agents, our on-call rotation was dreaded. People burned out. Senior engineers spent their mornings debugging the same categories of failures they'd seen hundreds of times. Now, on-call is genuinely boring most weeks. The agents handle the repetitive stuff, and the pages that do come through are the interesting problems -- the ones where a senior engineer's judgment actually matters.

We reallocated 39 hours per week of maintenance time into building new data products. In the three months since launching agents, we shipped a real-time fraud detection pipeline, a customer health score model, and a self-serve analytics platform. None of that was on the roadmap before because we "didn't have time."

We had time. We were just spending it restarting failed DAGs at 3 AM.

Conclusion

AI agents for data pipeline maintenance aren't experimental anymore. The tool-calling APIs are reliable. The cost is negligible compared to engineer time. The safety patterns are well-understood. If your data team spends more than 30% of its time on maintenance -- and statistically, it does -- you're leaving a 100x ROI on the table. Start with a read-only investigation agent this week. You'll wonder why you waited.

Leave a Comment