From Kafka to QuestDB

From Kafka to QuestDB: Exactly-Once-ish Ingest with ILP, UPSERT KEYS, Backpressure & E2E Tests

Meta description (158 chars):
Design an “exactly-once-ish” Kafka→QuestDB pipeline using ILP, UPSERT KEYS + dedup, smart backpressure, and Testcontainers-backed end-to-end tests.


Why this matters

You’re wiring a real-time pipeline: events land in Kafka, product teams want live charts, and SREs want zero duplicates after retries or restarts. Exactly-once is a unicorn; what you can deliver—exactly-once-ish—is practical and robust with QuestDB’s ILP/HTTP + dedup and disciplined consumer commits. (QuestDB)


Architecture at a glance

Kafka topic  -->  Consumer (Connect or custom)  -->  ILP/HTTP  -->  QuestDB (WAL table with DEDUP UPSERT KEYS)
                                ^                                                 |
                                |---------------- commit offsets after 2xx + flush |

Key ideas:

  • Use ILP/HTTP (port 9000) for clear error codes + automatic retries; use TCP (9009) only when you know why. (QuestDB)
  • Create WAL tables with DEDUP UPSERT KEYS (must include the designated timestamp) to make replays and client retries idempotent. (QuestDB)
  • Commit Kafka offsets only after a successful ILP request (and MVCC-style write is visible). (QuestDB)

1) Create a QuestDB table for idempotent writes

-- A minimal time-series table with deduplication
CREATE TABLE trades (
  ts      TIMESTAMP,
  symbol  SYMBOL,
  price   DOUBLE,
  amount  DOUBLE
) TIMESTAMP(ts)
PARTITION BY DAY
DEDUP UPSERT KEYS (ts, symbol);   -- include the designated timestamp

Notes:

  • Dedup works only on WAL tables and replaces existing rows matching the UPSERT key.
  • Good upsert keys = {event timestamp} + {entity key(s)} (e.g., ts, order_id or ts, symbol). (QuestDB)

To retrofit an existing table:

ALTER TABLE trades DEDUP ENABLE UPSERT KEYS (ts, symbol);

(QuestDB)


2) Option A — Kafka Connect sink (recommended)

Use the first-party QuestDB Kafka Connector (Kafka Connect sink). It speaks ILP to QuestDB and handles retries, converters, and DLQ. (QuestDB)

Minimal properties:

name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector
client.conf.string=http::addr=localhost:9000;   # ILP/HTTP
topics=example-topic
table=example_table

# JSON example (swap for Avro / Protobuf via converters)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

(QuestDB)

Designated timestamp from payload (for dedup):

timestamp.field.name=event_time
timestamp.string.fields=event_time
timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z

(QuestDB)

Latency & backpressure knobs (batching):

# Smaller batches -> lower latency, more requests
client.conf.string=http::addr=localhost:9000;auto_flush_rows=1000;
allowed.lag=250
  • auto_flush_rows controls ILP batch size.
  • allowed.lag flushes when the topic goes quiet. (QuestDB)

Retries & exactly-once-ish:

client.conf.string=http::addr=localhost:9000;retry_timeout=60000;
  • Retries can re-send the same batch; enable table dedup to achieve exactly-once-ish delivery.
  • Or set retry_timeout=0 for at-most-once (rarely what you want). (QuestDB)

Dead-letter queue (keep the sink alive on bad records):

errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-questdb
errors.deadletterqueue.topic.replication.factor=1

(QuestDB)


3) Option B — Custom consumer with ILP client (Java or Python)

When you need business logic or bespoke routing, roll your own with QuestDB’s ILP clients.

Java ILP/HTTP (recommended for reliability)

try (var sender = io.questdb.client.Sender.fromConfig(
        "http::addr=localhost:9000;auto_flush_rows=2000;retry_timeout=10000;")) {
  // build a row
  sender.table("trades")
        .symbol("symbol", "ETH-USD")
        .doubleColumn("price", 3190.25)
        .doubleColumn("amount", 0.5)
        .at(Instant.now());     // use event time whenever possible
  // sender.flush(); // usually not needed with auto-flush
}
  • HTTP gives error feedback + automatic retries; TCP simply disconnects on error (no reason given).
  • Avoid ingestion-time timestamps if you want dedup to work. (QuestDB)

Python ILP

from questdb.ingress import Sender, TimestampNanos
with Sender.from_conf("http::addr=localhost:9000;auto_flush_rows=2000;retry_timeout=10000;") as s:
    s.row("trades").symbol("symbol","ETH-USD").float_column("price",3190.25)\
       .float_column("amount",0.5).at(TimestampNanos.now())

(QuestDB Client Library)

Backpressure pattern (custom consumer)

  • Keep max_buf_size finite; if you approach it or retries escalate, pause Kafka partitions, flush/await, then resume and continue.
  • Commit offsets after 2xx + flush ⇒ if the app dies mid-send, reprocessing is safe due to table dedup. (QuestDB)

4) Exactly-once-ish, precisely

What ILP guarantees:

  • ILP/HTTP treats each request as a transaction per table; don’t mix tables in one request if you want transactional semantics.
  • Retries may duplicate rows; dedup with UPSERT KEYS converts that into exactly-once-ish. (QuestDB)

What dedup guarantees:

  • On insert, QuestDB replaces any existing row with the same UPSERT KEYS, which must include the designated timestamp.
  • Dedup is WAL-only and has write-overhead (key-heavy colliding timestamps are the worst case). (QuestDB)

Transport choice

TransportError feedbackRetriesThroughputWhen to use
ILP/HTTP (9000)Rich (status codes)Built-inGreatDefault for Kafka pipelines
ILP/TCP (9009)Connection drop onlyManualSlightly lower overheadOnly if you must and understand the trade-offs
(QuestDB)

5) End-to-end test (repeatable proof)

Use Testcontainers to spin up QuestDB + Kafka and assert dedup behavior end-to-end.

Java + JUnit sketch:

@Container
static QuestDBContainer qdb = new QuestDBContainer("questdb/questdb:latest"); // JDBC + ILP open
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

@Test
void ingest_is_idempotent() throws Exception {
  // 1) Produce duplicate events to Kafka (same ts+symbol)
  var producer = new KafkaProducer<String,String>(props(kafka.getBootstrapServers()));
  var json = "{\"symbol\":\"ETH-USD\",\"price\":3190.25,\"ts\":\"2025-11-20T10:00:00.000Z\"}";
  producer.send(new ProducerRecord<>("trades", "k1", json));
  producer.send(new ProducerRecord<>("trades", "k1", json)); // duplicate
  producer.flush();

  // 2) Minimal consumer -> ILP/HTTP Sender (retry_timeout>0, auto_flush_rows small)
  //    Commit offsets *after* sender.flush() returns without exception.

  // 3) Assert only one row persisted (UPSERT KEYS on ts,symbol)
  try (var conn = DriverManager.getConnection(qdb.getJdbcUrl(), "admin", "quest")) {
    var stmt = conn.createStatement();
    stmt.execute("""
      CREATE TABLE IF NOT EXISTS trades (
        ts TIMESTAMP, symbol SYMBOL, price DOUBLE
      ) TIMESTAMP(ts) PARTITION BY DAY DEDUP UPSERT KEYS (ts, symbol);
    """);
    // small wait, then
    var rs = stmt.executeQuery("select count() from trades where ts='2025-11-20T10:00:00.000Z' and symbol='ETH-USD'");
    rs.next();
    assertEquals(1, rs.getLong(1)); // duplicate was de-duped
  }
}
  • Why this proves it: duplicates are produced; the consumer may re-send on retry; the table’s UPSERT KEYS ensures a single row.
  • Testcontainers provides ready-made QuestDBContainer and KafkaContainer. (Testcontainers)

6) Backpressure & tuning checklist

  • Batch sizing: start with auto_flush_rows=1000–5000 for low latency; raise for throughput. (QuestDB)
  • Retry horizon: retry_timeout=10s–60s balances transient blips vs. producer stall. (QuestDB)
  • One table per request: preserve HTTP transaction semantics. (QuestDB)
  • Use event time: required for dedup to work correctly and replay safely. (QuestDB)
  • Observe write pressure: watch ILP client buffers (max_buf_size) and pause/resume Kafka consumption when nearing limits. (QuestDB)

7) Common pitfalls

  • Forgetting the timestamp in UPSERT KEYS → dedup won’t engage properly. (QuestDB)
  • Mixing two tables in one HTTP request → not fully transactional; split batches per table. (QuestDB)
  • Assuming TCP gives the same safety as HTTP → TCP drops the connection on error with no details. (QuestDB)
  • Leaving schema auto-create on when you need strict transactions → consider disabling table/column auto-create. (QuestDB)

Summary (TL;DR)

  • Pick ILP/HTTP (port 9000) with retries and WAL tables configured for DEDUP UPSERT KEYS (include the timestamp).
  • Commit Kafka offsets after successful ILP write.
  • Size batches for your latency SLO and use DLQ for poison messages.
  • Prove it with Testcontainers so you can refactor with confidence. (QuestDB)

Call to action: Start with the Kafka connector + a single dedup’d table. Measure publish→query latency at different auto_flush_rows, then add custom consumer logic only if you truly need it. (QuestDB)


Internal link ideas (for your site)

  • “QuestDB Dedup 101: Choosing UPSERT KEYS for Idempotency”
  • “ILP/HTTP vs ILP/TCP: Error Handling, Retries, and Throughput”
  • “Testing Data Pipelines with Testcontainers: Patterns & Pitfalls”
  • “Designated Timestamps: Parsing, Timezones, and Replays”
  • “Backpressure Patterns for Kafka Consumers”

Image prompt (for DALL·E / Midjourney)

“A clean, modern diagram of a Kafka→QuestDB pipeline showing ILP/HTTP, WAL table with ‘DEDUP UPSERT KEYS(ts, key)’, retry loop, and offset commit after 2xx. Minimalist, high-contrast, isometric style.”


Tags

#QuestDB #Kafka #ExactlyOnce #ILP #DataEngineering #Streaming #Idempotency #Deduplication #Backpressure #Testcontainers

Leave a Reply

Your email address will not be published. Required fields are marked *