From Kafka to QuestDB: Exactly-Once-ish Ingest with ILP, UPSERT KEYS, Backpressure & E2E Tests
Meta description (158 chars):
Design an “exactly-once-ish” Kafka→QuestDB pipeline using ILP, UPSERT KEYS + dedup, smart backpressure, and Testcontainers-backed end-to-end tests.
Why this matters
You’re wiring a real-time pipeline: events land in Kafka, product teams want live charts, and SREs want zero duplicates after retries or restarts. Exactly-once is a unicorn; what you can deliver—exactly-once-ish—is practical and robust with QuestDB’s ILP/HTTP + dedup and disciplined consumer commits. (QuestDB)
Architecture at a glance
Kafka topic --> Consumer (Connect or custom) --> ILP/HTTP --> QuestDB (WAL table with DEDUP UPSERT KEYS)
^ |
|---------------- commit offsets after 2xx + flush |
Key ideas:
- Use ILP/HTTP (port 9000) for clear error codes + automatic retries; use TCP (9009) only when you know why. (QuestDB)
- Create WAL tables with DEDUP UPSERT KEYS (must include the designated timestamp) to make replays and client retries idempotent. (QuestDB)
- Commit Kafka offsets only after a successful ILP request (and MVCC-style write is visible). (QuestDB)
1) Create a QuestDB table for idempotent writes
-- A minimal time-series table with deduplication
CREATE TABLE trades (
ts TIMESTAMP,
symbol SYMBOL,
price DOUBLE,
amount DOUBLE
) TIMESTAMP(ts)
PARTITION BY DAY
DEDUP UPSERT KEYS (ts, symbol); -- include the designated timestamp
Notes:
- Dedup works only on WAL tables and replaces existing rows matching the UPSERT key.
- Good upsert keys =
{event timestamp} + {entity key(s)}(e.g.,ts, order_idorts, symbol). (QuestDB)
To retrofit an existing table:
ALTER TABLE trades DEDUP ENABLE UPSERT KEYS (ts, symbol);
(QuestDB)
2) Option A — Kafka Connect sink (recommended)
Use the first-party QuestDB Kafka Connector (Kafka Connect sink). It speaks ILP to QuestDB and handles retries, converters, and DLQ. (QuestDB)
Minimal properties:
name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector
client.conf.string=http::addr=localhost:9000; # ILP/HTTP
topics=example-topic
table=example_table
# JSON example (swap for Avro / Protobuf via converters)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
(QuestDB)
Designated timestamp from payload (for dedup):
timestamp.field.name=event_time
timestamp.string.fields=event_time
timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z
(QuestDB)
Latency & backpressure knobs (batching):
# Smaller batches -> lower latency, more requests
client.conf.string=http::addr=localhost:9000;auto_flush_rows=1000;
allowed.lag=250
auto_flush_rowscontrols ILP batch size.allowed.lagflushes when the topic goes quiet. (QuestDB)
Retries & exactly-once-ish:
client.conf.string=http::addr=localhost:9000;retry_timeout=60000;
- Retries can re-send the same batch; enable table dedup to achieve exactly-once-ish delivery.
- Or set
retry_timeout=0for at-most-once (rarely what you want). (QuestDB)
Dead-letter queue (keep the sink alive on bad records):
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-questdb
errors.deadletterqueue.topic.replication.factor=1
(QuestDB)
3) Option B — Custom consumer with ILP client (Java or Python)
When you need business logic or bespoke routing, roll your own with QuestDB’s ILP clients.
Java ILP/HTTP (recommended for reliability)
try (var sender = io.questdb.client.Sender.fromConfig(
"http::addr=localhost:9000;auto_flush_rows=2000;retry_timeout=10000;")) {
// build a row
sender.table("trades")
.symbol("symbol", "ETH-USD")
.doubleColumn("price", 3190.25)
.doubleColumn("amount", 0.5)
.at(Instant.now()); // use event time whenever possible
// sender.flush(); // usually not needed with auto-flush
}
- HTTP gives error feedback + automatic retries; TCP simply disconnects on error (no reason given).
- Avoid ingestion-time timestamps if you want dedup to work. (QuestDB)
Python ILP
from questdb.ingress import Sender, TimestampNanos
with Sender.from_conf("http::addr=localhost:9000;auto_flush_rows=2000;retry_timeout=10000;") as s:
s.row("trades").symbol("symbol","ETH-USD").float_column("price",3190.25)\
.float_column("amount",0.5).at(TimestampNanos.now())
Backpressure pattern (custom consumer)
- Keep
max_buf_sizefinite; if you approach it or retries escalate, pause Kafka partitions, flush/await, then resume and continue. - Commit offsets after
2xx+ flush ⇒ if the app dies mid-send, reprocessing is safe due to table dedup. (QuestDB)
4) Exactly-once-ish, precisely
What ILP guarantees:
- ILP/HTTP treats each request as a transaction per table; don’t mix tables in one request if you want transactional semantics.
- Retries may duplicate rows; dedup with UPSERT KEYS converts that into exactly-once-ish. (QuestDB)
What dedup guarantees:
- On insert, QuestDB replaces any existing row with the same UPSERT KEYS, which must include the designated timestamp.
- Dedup is WAL-only and has write-overhead (key-heavy colliding timestamps are the worst case). (QuestDB)
Transport choice
| Transport | Error feedback | Retries | Throughput | When to use |
|---|---|---|---|---|
| ILP/HTTP (9000) | Rich (status codes) | Built-in | Great | Default for Kafka pipelines |
| ILP/TCP (9009) | Connection drop only | Manual | Slightly lower overhead | Only if you must and understand the trade-offs |
| (QuestDB) |
5) End-to-end test (repeatable proof)
Use Testcontainers to spin up QuestDB + Kafka and assert dedup behavior end-to-end.
Java + JUnit sketch:
@Container
static QuestDBContainer qdb = new QuestDBContainer("questdb/questdb:latest"); // JDBC + ILP open
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
@Test
void ingest_is_idempotent() throws Exception {
// 1) Produce duplicate events to Kafka (same ts+symbol)
var producer = new KafkaProducer<String,String>(props(kafka.getBootstrapServers()));
var json = "{\"symbol\":\"ETH-USD\",\"price\":3190.25,\"ts\":\"2025-11-20T10:00:00.000Z\"}";
producer.send(new ProducerRecord<>("trades", "k1", json));
producer.send(new ProducerRecord<>("trades", "k1", json)); // duplicate
producer.flush();
// 2) Minimal consumer -> ILP/HTTP Sender (retry_timeout>0, auto_flush_rows small)
// Commit offsets *after* sender.flush() returns without exception.
// 3) Assert only one row persisted (UPSERT KEYS on ts,symbol)
try (var conn = DriverManager.getConnection(qdb.getJdbcUrl(), "admin", "quest")) {
var stmt = conn.createStatement();
stmt.execute("""
CREATE TABLE IF NOT EXISTS trades (
ts TIMESTAMP, symbol SYMBOL, price DOUBLE
) TIMESTAMP(ts) PARTITION BY DAY DEDUP UPSERT KEYS (ts, symbol);
""");
// small wait, then
var rs = stmt.executeQuery("select count() from trades where ts='2025-11-20T10:00:00.000Z' and symbol='ETH-USD'");
rs.next();
assertEquals(1, rs.getLong(1)); // duplicate was de-duped
}
}
- Why this proves it: duplicates are produced; the consumer may re-send on retry; the table’s UPSERT KEYS ensures a single row.
- Testcontainers provides ready-made QuestDBContainer and KafkaContainer. (Testcontainers)
6) Backpressure & tuning checklist
- Batch sizing: start with
auto_flush_rows=1000–5000for low latency; raise for throughput. (QuestDB) - Retry horizon:
retry_timeout=10s–60sbalances transient blips vs. producer stall. (QuestDB) - One table per request: preserve HTTP transaction semantics. (QuestDB)
- Use event time: required for dedup to work correctly and replay safely. (QuestDB)
- Observe write pressure: watch ILP client buffers (
max_buf_size) and pause/resume Kafka consumption when nearing limits. (QuestDB)
7) Common pitfalls
- Forgetting the timestamp in UPSERT KEYS → dedup won’t engage properly. (QuestDB)
- Mixing two tables in one HTTP request → not fully transactional; split batches per table. (QuestDB)
- Assuming TCP gives the same safety as HTTP → TCP drops the connection on error with no details. (QuestDB)
- Leaving schema auto-create on when you need strict transactions → consider disabling table/column auto-create. (QuestDB)
Summary (TL;DR)
- Pick ILP/HTTP (port 9000) with retries and WAL tables configured for DEDUP UPSERT KEYS (include the timestamp).
- Commit Kafka offsets after successful ILP write.
- Size batches for your latency SLO and use DLQ for poison messages.
- Prove it with Testcontainers so you can refactor with confidence. (QuestDB)
Call to action: Start with the Kafka connector + a single dedup’d table. Measure publish→query latency at different auto_flush_rows, then add custom consumer logic only if you truly need it. (QuestDB)
Internal link ideas (for your site)
- “QuestDB Dedup 101: Choosing UPSERT KEYS for Idempotency”
- “ILP/HTTP vs ILP/TCP: Error Handling, Retries, and Throughput”
- “Testing Data Pipelines with Testcontainers: Patterns & Pitfalls”
- “Designated Timestamps: Parsing, Timezones, and Replays”
- “Backpressure Patterns for Kafka Consumers”
Image prompt (for DALL·E / Midjourney)
“A clean, modern diagram of a Kafka→QuestDB pipeline showing ILP/HTTP, WAL table with ‘DEDUP UPSERT KEYS(ts, key)’, retry loop, and offset commit after 2xx. Minimalist, high-contrast, isometric style.”
Tags
#QuestDB #Kafka #ExactlyOnce #ILP #DataEngineering #Streaming #Idempotency #Deduplication #Backpressure #Testcontainers









Leave a Reply