Exactly-Once Kafka Ingestion with Apache Doris Routine Load: Offsets, Schema Drift, Backpressure, and SLAs
Meta description (156 chars):
Exactly-once Kafka → Apache Doris with Routine Load. Master offsets, schema evolution options, backpressure knobs, monitoring, and SLA design for real-time ETL.
Why this matters
You’ve got clickstream or CDC events piling up in Kafka, BI asking for near-real-time dashboards, and zero patience for duplicates or gaps. You need a pipeline that:
- Keeps up under bursty traffic
- Survives restarts without re-ingesting old data
- Tolerates schema changes without babysitting jobs
- Proves it met the SLA
That’s exactly the slot Apache Doris Routine Load fills: continuous Kafka → SQL table ingestion with exactly-once semantics, low latency, and simple ops. (Apache Doris)
Mental model: how Routine Load achieves “exactly-once”
At a high level:
Kafka topic/partitions
└─> Routine Load job
└─> N concurrent tasks (micro-batches)
├─ consume from assigned partitions
├─ transform/validate
└─ commit as a single Doris transaction
- Doris runs your job continuously, splitting it into micro-batch tasks that each map to a transaction. Batches close on time/row/byte thresholds or when EOF is reached. (Apache Doris)
- Exactly-once is enforced by persisting consumption progress + commit validation. Even with retries or brief double scheduling, duplicate commits are rejected. (Apache Doris)
- Jobs auto-recover after transient issues; manual
PAUSEdisables auto-resume. (Apache Doris)
Offsets that you can reason about
Routine Load lets you control where consumption starts and how you adjust it later:
| What you need | How to do it |
|---|---|
| Start from beginning/end or a specific timestamp | Set default offsets when creating the job (e.g., kafka_default_offsets = '2025-11-20 10:00:00' or OFFSET_BEGINNING). (Apache Doris) |
| Pin to exact partition offsets | Use kafka_offsets (and optionally kafka_partitions) via ALTER ROUTINE LOAD. Great for surgical replays. (Apache Doris) |
| Inspect progress & lag per partition | SHOW ROUTINE LOAD (job-level) and SHOW ROUTINE LOAD TASK (subtask-level) expose consumed offsets and lag. (Apache Doris) |
| Pause, resume, or stop | PAUSE/RESUME/STOP ROUTINE LOAD with clear semantics (resume picks up from last committed offset; stop is final). (Apache Doris) |
Tip: If a job is paused too long and Kafka retention expires, you’ll see “offset out of range”; fix by resetting offsets to a valid point or recreating the job. (Apache Doris)
Handling schema drift (without babysitting)
Real streams change. Doris gives you two layers of control:
- Loader-level tolerance & mapping
Use Routine Load properties (set at create time or viaALTER) to adapt without code redeploys:
- JSON extraction:
format='json',jsonpaths,json_root,strip_outer_array. (Apache Doris) - Type/format tolerance:
strict_mode(off to coerce where possible),num_as_string,fuzzy_parse. - Partial updates:
partial_columns=truefor UNIQUE KEY + Merge-on-Write tables when not all columns are present. (Apache Doris)
- Table-level evolution
Add or modify columns with schema change (lightweight vs heavyweight changes). Use this when the model itself evolves. (Apache Doris)
Backpressure: make the pipeline breathe
Throttle or push throughput by tuning batching and concurrency:
- Batch boundaries (end a micro-batch when any hits):
max_batch_interval(sec),max_batch_rows,max_batch_size(bytes). Defaults are sensible; lower interval for latency, raise size/rows for throughput. (Apache Doris) - Parallelism:
desired_concurrent_number≈ tasks (upper-bounded by topic partitions and cluster caps). Match or modestly trail partition count. (Apache Doris) - Data quality gates:
max_error_number/max_filter_ratiopause the job when bad rows exceed thresholds—useful for protecting SLAs. (Apache Doris)
Doris exposes signals to diagnose backlog; if BE logs show batches ending on max_batch_size first, increase it to better fill the interval. (Apache Doris)
Hands-on: create a resilient, exactly-once Routine Load
1) Table for events
CREATE TABLE rt_events (
event_time DATETIME,
user_id BIGINT,
action VARCHAR(32),
attr_json JSON
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;
2) Job with sane defaults and tolerant JSON parsing
CREATE ROUTINE LOAD app.rt_events_rl ON rt_events
PROPERTIES (
"desired_concurrent_number" = "8",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "536870912", -- 512MB
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.action\",\"$\"]",
"strict_mode" = "false" -- tolerate minor type drift
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "events",
"kafka_default_offsets" = "OFFSET_BEGINNING" -- or a timestamp
);
Exactly-once semantics apply automatically for JSON/CSV Routine Load. (Apache Doris)
3) Operational playbook
- Check health/lag
SHOW ROUTINE LOAD FOR app.rt_events_rl\G; SHOW ROUTINE LOAD TASK FOR app.rt_events_rl\G;You’ll see consumed offsets and per-partition lag; alerts can scrape these. (Apache Doris) - Backfill 1 hour from yesterday
ALTER ROUTINE LOAD app.rt_events_rl FROM KAFKA ("kafka_offsets" = "0:12345,1:67890,2:54321");Or recreate withkafka_default_offsets='2025-11-19 10:00:00'. (Apache Doris) - Throttle during hot hours
ALTER ROUTINE LOAD app.rt_events_rl PROPERTIES ("desired_concurrent_number"="4","max_batch_interval"="20");(Apache Doris) - Safe interventions
PAUSE ROUTINE LOAD FOR app.rt_events_rl;→ maintenanceRESUME ROUTINE LOAD FOR app.rt_events_rl;→ continues from last commitSTOP ROUTINE LOAD FOR app.rt_events_rl;→ final; recreate to ingest again. (Apache Doris)
Designing SLAs that mean something
Latency SLA (ingest-to-visible):
- Target P95 task commit latency (batch close + commit). Tune
max_batch_intervalfirst; don’t over-optimize row/byte limits. (Apache Doris)
Freshness SLA (topic lag):
- Alert on
Lagper partition fromSHOW ROUTINE LOAD. Scaledesired_concurrent_numberuntil lag stabilizes near zero during peaks. (Apache Doris)
Data quality SLA:
- Set
max_error_number/max_filter_ratio. “Fail noisy” early beats silent corruption. (Apache Doris)
Recovery SLA:
- Document the playbook for “offset out of range” (reset offsets or recreate with timestamp). Use Kafka retention comfortably above your MTTD + MTTR. (Apache Doris)
Common pitfalls (and how to avoid them)
- Microscopic batches (tiny
max_batch_interval) → high commit overhead. Start at 10–60s; confirm throughput before lowering. (Apache Doris) - Concurrency > partitions doesn’t help; match partitions first. (Apache Doris)
- Manual pause without a plan leads to expired offsets. If pausing, ensure Kafka retention safely covers the downtime. (Apache Doris)
- Assuming schema changes “just work” — for model changes use schema change; for transient drift use loader properties. (Apache Doris)
TL;DR
- Use Routine Load for continuous Kafka → Doris ingestion with exactly-once guarantees. (Apache Doris)
- Control offsets at creation (
kafka_default_offsets, timestamps) and tweak withkafka_offsets. Inspect lag viaSHOW ROUTINE LOAD. (Apache Doris) - Tame backpressure with
max_batch_*anddesired_concurrent_number. (Apache Doris) - Handle schema drift via JSON options / tolerance flags, and apply schema change for real model evolution. (Apache Doris)
Call to action: pick one topic, create the job above, and wire alerts to Lag. If you share your partition counts + throughput, I’ll give you exact parameter targets.
Internal link ideas (official docs)
- Routine Load (overview & examples). (Apache Doris)
- CREATE ROUTINE LOAD (all properties, incl. time-based offsets). (Apache Doris)
- ALTER ROUTINE LOAD (change concurrency, JSON settings, explicit offsets). (Apache Doris)
- SHOW ROUTINE LOAD / SHOW ROUTINE LOAD TASK (monitor lag & progress). (Apache Doris)
- Schema Change (table evolution). (Apache Doris)
- Kafka data source notes (compat & getting started). (Apache Doris)
- Internals & best practices (batching/concurrency logic, exactly-once details). (Apache Doris)
Image prompt
“A clean, modern diagram of Kafka → Apache Doris Routine Load: topic partitions feeding multiple concurrent micro-batch tasks, each committing atomically to a Doris table; show offset control, batch thresholds (time/rows/bytes), and a monitoring panel for lag — minimalistic, high-contrast, 3D isometric style.”
Tags
#ApacheDoris #Kafka #ExactlyOnce #RealTimeAnalytics #DataEngineering #ETL #Backpressure #SchemaEvolution #Offsets #SLA





Leave a Reply