Kafka Ingestion with Apache Doris Routine Load

Exactly-Once Kafka Ingestion with Apache Doris Routine Load: Offsets, Schema Drift, Backpressure, and SLAs

Meta description (156 chars):
Exactly-once Kafka → Apache Doris with Routine Load. Master offsets, schema evolution options, backpressure knobs, monitoring, and SLA design for real-time ETL.


Why this matters

You’ve got clickstream or CDC events piling up in Kafka, BI asking for near-real-time dashboards, and zero patience for duplicates or gaps. You need a pipeline that:

  • Keeps up under bursty traffic
  • Survives restarts without re-ingesting old data
  • Tolerates schema changes without babysitting jobs
  • Proves it met the SLA

That’s exactly the slot Apache Doris Routine Load fills: continuous Kafka → SQL table ingestion with exactly-once semantics, low latency, and simple ops. (Apache Doris)


Mental model: how Routine Load achieves “exactly-once”

At a high level:

Kafka topic/partitions
   └─> Routine Load job
         └─> N concurrent tasks (micro-batches)
               ├─ consume from assigned partitions
               ├─ transform/validate
               └─ commit as a single Doris transaction
  • Doris runs your job continuously, splitting it into micro-batch tasks that each map to a transaction. Batches close on time/row/byte thresholds or when EOF is reached. (Apache Doris)
  • Exactly-once is enforced by persisting consumption progress + commit validation. Even with retries or brief double scheduling, duplicate commits are rejected. (Apache Doris)
  • Jobs auto-recover after transient issues; manual PAUSE disables auto-resume. (Apache Doris)

Offsets that you can reason about

Routine Load lets you control where consumption starts and how you adjust it later:

What you needHow to do it
Start from beginning/end or a specific timestampSet default offsets when creating the job (e.g., kafka_default_offsets = '2025-11-20 10:00:00' or OFFSET_BEGINNING). (Apache Doris)
Pin to exact partition offsetsUse kafka_offsets (and optionally kafka_partitions) via ALTER ROUTINE LOAD. Great for surgical replays. (Apache Doris)
Inspect progress & lag per partitionSHOW ROUTINE LOAD (job-level) and SHOW ROUTINE LOAD TASK (subtask-level) expose consumed offsets and lag. (Apache Doris)
Pause, resume, or stopPAUSE/RESUME/STOP ROUTINE LOAD with clear semantics (resume picks up from last committed offset; stop is final). (Apache Doris)

Tip: If a job is paused too long and Kafka retention expires, you’ll see “offset out of range”; fix by resetting offsets to a valid point or recreating the job. (Apache Doris)


Handling schema drift (without babysitting)

Real streams change. Doris gives you two layers of control:

  1. Loader-level tolerance & mapping
    Use Routine Load properties (set at create time or via ALTER) to adapt without code redeploys:
  • JSON extraction: format='json', jsonpaths, json_root, strip_outer_array. (Apache Doris)
  • Type/format tolerance: strict_mode (off to coerce where possible), num_as_string, fuzzy_parse.
  • Partial updates: partial_columns=true for UNIQUE KEY + Merge-on-Write tables when not all columns are present. (Apache Doris)
  1. Table-level evolution
    Add or modify columns with schema change (lightweight vs heavyweight changes). Use this when the model itself evolves. (Apache Doris)

Backpressure: make the pipeline breathe

Throttle or push throughput by tuning batching and concurrency:

  • Batch boundaries (end a micro-batch when any hits):
    max_batch_interval (sec), max_batch_rows, max_batch_size (bytes). Defaults are sensible; lower interval for latency, raise size/rows for throughput. (Apache Doris)
  • Parallelism: desired_concurrent_number ≈ tasks (upper-bounded by topic partitions and cluster caps). Match or modestly trail partition count. (Apache Doris)
  • Data quality gates: max_error_number / max_filter_ratio pause the job when bad rows exceed thresholds—useful for protecting SLAs. (Apache Doris)

Doris exposes signals to diagnose backlog; if BE logs show batches ending on max_batch_size first, increase it to better fill the interval. (Apache Doris)


Hands-on: create a resilient, exactly-once Routine Load

1) Table for events

CREATE TABLE rt_events (
  event_time   DATETIME,
  user_id      BIGINT,
  action       VARCHAR(32),
  attr_json    JSON
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;

2) Job with sane defaults and tolerant JSON parsing

CREATE ROUTINE LOAD app.rt_events_rl ON rt_events
PROPERTIES (
  "desired_concurrent_number" = "8",
  "max_batch_interval" = "10",
  "max_batch_rows"     = "500000",
  "max_batch_size"     = "536870912",   -- 512MB
  "format"             = "json",
  "jsonpaths"          = "[\"$.event_time\",\"$.user_id\",\"$.action\",\"$\"]",
  "strict_mode"        = "false"        -- tolerate minor type drift
)
FROM KAFKA (
  "kafka_broker_list"       = "broker1:9092,broker2:9092",
  "kafka_topic"             = "events",
  "kafka_default_offsets"   = "OFFSET_BEGINNING"  -- or a timestamp
);

Exactly-once semantics apply automatically for JSON/CSV Routine Load. (Apache Doris)

3) Operational playbook

  • Check health/lag SHOW ROUTINE LOAD FOR app.rt_events_rl\G; SHOW ROUTINE LOAD TASK FOR app.rt_events_rl\G; You’ll see consumed offsets and per-partition lag; alerts can scrape these. (Apache Doris)
  • Backfill 1 hour from yesterday ALTER ROUTINE LOAD app.rt_events_rl FROM KAFKA ("kafka_offsets" = "0:12345,1:67890,2:54321"); Or recreate with kafka_default_offsets='2025-11-19 10:00:00'. (Apache Doris)
  • Throttle during hot hours ALTER ROUTINE LOAD app.rt_events_rl PROPERTIES ("desired_concurrent_number"="4","max_batch_interval"="20"); (Apache Doris)
  • Safe interventions
    PAUSE ROUTINE LOAD FOR app.rt_events_rl; → maintenance
    RESUME ROUTINE LOAD FOR app.rt_events_rl; → continues from last commit
    STOP ROUTINE LOAD FOR app.rt_events_rl; → final; recreate to ingest again. (Apache Doris)

Designing SLAs that mean something

Latency SLA (ingest-to-visible):

  • Target P95 task commit latency (batch close + commit). Tune max_batch_interval first; don’t over-optimize row/byte limits. (Apache Doris)

Freshness SLA (topic lag):

  • Alert on Lag per partition from SHOW ROUTINE LOAD. Scale desired_concurrent_number until lag stabilizes near zero during peaks. (Apache Doris)

Data quality SLA:

  • Set max_error_number/max_filter_ratio. “Fail noisy” early beats silent corruption. (Apache Doris)

Recovery SLA:

  • Document the playbook for “offset out of range” (reset offsets or recreate with timestamp). Use Kafka retention comfortably above your MTTD + MTTR. (Apache Doris)

Common pitfalls (and how to avoid them)

  • Microscopic batches (tiny max_batch_interval) → high commit overhead. Start at 10–60s; confirm throughput before lowering. (Apache Doris)
  • Concurrency > partitions doesn’t help; match partitions first. (Apache Doris)
  • Manual pause without a plan leads to expired offsets. If pausing, ensure Kafka retention safely covers the downtime. (Apache Doris)
  • Assuming schema changes “just work” — for model changes use schema change; for transient drift use loader properties. (Apache Doris)

TL;DR

  • Use Routine Load for continuous Kafka → Doris ingestion with exactly-once guarantees. (Apache Doris)
  • Control offsets at creation (kafka_default_offsets, timestamps) and tweak with kafka_offsets. Inspect lag via SHOW ROUTINE LOAD. (Apache Doris)
  • Tame backpressure with max_batch_* and desired_concurrent_number. (Apache Doris)
  • Handle schema drift via JSON options / tolerance flags, and apply schema change for real model evolution. (Apache Doris)

Call to action: pick one topic, create the job above, and wire alerts to Lag. If you share your partition counts + throughput, I’ll give you exact parameter targets.


Internal link ideas (official docs)

  • Routine Load (overview & examples). (Apache Doris)
  • CREATE ROUTINE LOAD (all properties, incl. time-based offsets). (Apache Doris)
  • ALTER ROUTINE LOAD (change concurrency, JSON settings, explicit offsets). (Apache Doris)
  • SHOW ROUTINE LOAD / SHOW ROUTINE LOAD TASK (monitor lag & progress). (Apache Doris)
  • Schema Change (table evolution). (Apache Doris)
  • Kafka data source notes (compat & getting started). (Apache Doris)
  • Internals & best practices (batching/concurrency logic, exactly-once details). (Apache Doris)

Image prompt

“A clean, modern diagram of Kafka → Apache Doris Routine Load: topic partitions feeding multiple concurrent micro-batch tasks, each committing atomically to a Doris table; show offset control, batch thresholds (time/rows/bytes), and a monitoring panel for lag — minimalistic, high-contrast, 3D isometric style.”


Tags

#ApacheDoris #Kafka #ExactlyOnce #RealTimeAnalytics #DataEngineering #ETL #Backpressure #SchemaEvolution #Offsets #SLA

Leave a Reply

Your email address will not be published. Required fields are marked *