Monitoring & alerting (data, models, services)


The observability “triangle”

  • Metrics – small numbers over time (rates, counts, latencies). Cheap, great for alerts and dashboards.
    Tools: prometheus_client, statsd, datadog.
  • Logs – structured event records for debugging & audits.
    Tools: structlog, loguru, error capture via sentry-sdk.
  • Traces – end-to-end timing across services/jobs.
    Tooling & glue: opentelemetry-sdk (+ opentelemetry-instrumentation-*).

For data/ML, add:

  • Data & model monitoring – schema/quality checks, drift detection, performance.
    Tools: evidently, whylogs, alibi-detect.

Alert destinations:

  • Notifications – ship signals to humans/systems.
    Tools: slack_sdk, apprise (multi-channel), twilio (SMS).

What to instrument (quick checklists)

Batch jobs / pipelines

  • jobs_total{status=ok|error}
  • rows_processed_total, bytes_processed_total
  • job_duration_seconds (Histogram; alert on p95)
  • Data checks: null %, range checks, schema mismatch, feature drift
  • Log every run with trace_id, inputs/outputs, and why it failed

Online services (APIs/serving)

  • RED method: Rate, Errors, Duration
  • Per-route latency histograms, error counts
  • Trace external calls (DB, S3, feature store), propagate trace context
  • Capture exceptions (Sentry), keep logs structured

Libraries in plain words (and when to pick them)

Metrics: prometheus_client vs statsd vs datadog

  • prometheus_client – Expose /metrics (pull). Perfect with Prometheus/Grafana.
  • statsd – Push UDP/TCP counters/gauges to a StatsD/Datadog agent. Low overhead.
  • datadog – Client that talks to the Datadog agent; tight APM/metrics integration.

Rule of thumb: If you own the stack → Prometheus; if you’re on Datadog → datadog/StatsD.

Tracing: opentelemetry-sdk (+ instrumentations)

  • Vendor-neutral API/SDK. Export to OTLP/Jaeger/Datadog/New Relic.
  • Auto-instrument many libs: opentelemetry-instrumentation-requests, -psycopg2, -fastapi, etc.

Logs: structlog / loguru

  • structlog – Structured JSON logs, easy to enrich (e.g., add trace_id).
  • loguru – “Batteries-included” logger with simple API; good for smaller apps.
  • Add sentry-sdk to capture exceptions and correlate with logs/traces.

Data/ML monitoring

  • evidently – Reports/metrics for data & model quality/drift; great for dashboards and CI checks.
  • whylogs – Lightweight profiles (sketches) for columns, suited for streaming + long-term baselines.
  • alibi-detect – Statistical drift & outlier detection (KS/MMD/PSI), both batch and online detectors.

Notifications

  • slack_sdk – First choice for team alerts.
  • apprise – One API, many channels (Slack, email, Discord, etc.).
  • twilio – SMS/voice when you need on-call paging without a full paging system.

Tiny, realistic glue snippets

1) Metrics + logs in a batch job (Prometheus + structlog)

# pip install prometheus-client structlog
from prometheus_client import Counter, Histogram, start_http_server
import structlog, time

RUNS = Counter("job_runs_total", "Job runs", ["job", "status"])
DUR = Histogram("job_duration_seconds", "Job duration", ["job"])

log = structlog.get_logger()

def run_job(job: str, rows: int) -> None:
    t0 = time.time()
    try:
        # ... do work ...
        time.sleep(0.1)
        RUNS.labels(job, "ok").inc()
        log.info("job_ok", job=job, rows=rows, duration=time.time()-t0)
    except Exception as e:
        RUNS.labels(job, "error").inc()
        log.error("job_error", job=job, err=str(e))
        raise
    finally:
        DUR.labels(job).observe(time.time() - t0)

if __name__ == "__main__":
    start_http_server(8000)  # Prometheus scrapes http://host:8000/metrics
    run_job("daily_ingest", rows=12345)

2) Add tracing and correlate with logs

# pip install opentelemetry-sdk opentelemetry-exporter-otlp structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import structlog

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))  # point to your OTLP endpoint
trace.set_tracer_provider(provider)
tr = trace.get_tracer(__name__)

def add_trace_id(logger, method, event_dict):
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        event_dict["trace_id"] = f"{ctx.trace_id:032x}"
        event_dict["span_id"]  = f"{ctx.span_id:016x}"
    return event_dict

structlog.configure(processors=[add_trace_id, structlog.processors.JSONRenderer()])

with tr.start_as_current_span("daily_ingest"):
    structlog.get_logger().info("started")
    # ... your work ...
    structlog.get_logger().info("finished")

3) Data drift check → Slack alert (evidently + slack_sdk)

# pip install evidently slack_sdk pandas
import pandas as pd
from slack_sdk import WebClient
from evidently.report import Report
from evidently.metrics import DataDriftPreset

ref = pd.read_parquet("baseline.parquet")
cur = pd.read_parquet("today.parquet")

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=ref, current_data=cur)
summary = report.as_dict()

drift_share = summary["metrics"][0]["result"]["dataset_drift"]["share_drifted_features"]

if drift_share > 0.3:
    WebClient(token="xoxb-...").chat_postMessage(
        channel="#data-alerts",
        text=f"⚠️ Data drift detected: {drift_share:.0%} features drifted"
    )

4) Streaming-friendly profiles with whylogs

# pip install whylogs pandas
import pandas as pd, whylogs as why
df = pd.read_parquet("batch.parquet")
profile = why.log(df).profile
# Save locally or send to WhyLabs; compare profiles daily for drift/quality trends.
profile.write(path="profiles/batch.bin")

5) Online drift detector with alibi-detect

# pip install alibi-detect numpy
import numpy as np
from alibi_detect.cd import MMDDrift

x_ref = np.load("x_ref.npy")             # reference features
cd = MMDDrift(x_ref, p_val=0.05)         # two-sample test
pred = cd.predict(np.load("x_cur.npy"))  # {"data": {"is_drift": bool, "p_val": float}}
if pred["data"]["is_drift"]:
    print("Drift!")

6) Multi-channel alert with apprise (one call → many targets)

# pip install apprise
import apprise
notify = apprise.Apprise()
notify.add("slack://xoxb-.../#data-alerts")
notify.add("mailto://user:pass@example.com")
notify.notify(title="Pipeline failed", body="Job daily_ingest failed on step transform.")

Choosing between similar tools (quick decisions)

  • prometheus_client vs statsd/datadog
    • Prometheus stack available? → prometheus_client.
    • Datadog in place? → datadog (or StatsD to agent).
  • structlog vs loguru
    • Need JSON logs + enrichment (trace IDs)? → structlog.
    • Want a simpler drop-in logger? → loguru.
  • evidently vs whylogs vs alibi-detect
    • Human-readable reports & dashboards (batch) → evidently.
    • Lightweight, streaming-friendly column profiles (compare over time) → whylogs.
    • Statistical drift detectors (batch/online), including embeddings → alibi-detect.
  • Notifications
    • Team chat? → slack_sdk.
    • Many channels with one API? → apprise.
    • SMS paging? → twilio.

Pitfalls to avoid (these bite in prod)

  • Label/cardinality explosions in metrics (e.g., user_id as a label) → restrict labels to low-cardinality fields.
  • Unbounded logs (PII, megabyte stacktraces) → sample noisy logs, mask PII.
  • Orphan traces (no propagation across hops) → use OTel context and instrument HTTP/DB clients.
  • Silent data drift → schedule evidently/whylogs checks and page when thresholds exceed.
  • Alert fatigue → SLOs with burn-rate alerts (fast/slow), deduplicate via your alert manager.

A minimal “starter pack” you can ship this week

  • Batch jobs: prometheus_client + structlog + evidently + slack_sdk
  • Services: opentelemetry-sdk + prometheus_client (or datadog) + structlog + sentry-sdk