Iterator (stream data in small chunks)

When to use

  • You need to process lots of rows without loading all into memory.
  • Your source is paged (DB cursor fetchmany, API pages, S3 list).
  • You want lazy processing (work happens as you loop).

Avoid when the dataset is tiny or you truly need a full in-memory list.

Diagram (text)

Client ── for row in stream_rows(...):
Iterator/generator ── pulls next chunk → yields rows one by one
Data source (DB/API) ──> fetchmany(page_size) until empty

Step-by-step idea

  1. Define how to get the next chunk (e.g., cursor.fetchmany(n)).
  2. The iterator yields items from each chunk.
  3. Stops when the source returns empty.

Python example (≤40 lines, type-hinted)

Concrete: stream rows from a DB-like cursor using fetchmany, constant memory.

from __future__ import annotations
from typing import Protocol, Iterator, Sequence, TypedDict

class Row(TypedDict):
    i: int

class Cursor(Protocol):
    def fetchmany(self, size: int) -> Sequence[Row]: ...

def stream_rows(cur: Cursor, batch: int = 1000) -> Iterator[Row]:
    # Calls cur.fetchmany(batch) repeatedly until it returns an empty list.
    for chunk in iter(lambda: cur.fetchmany(batch), []):
        for row in chunk:
            yield row

Tiny pytest (cements it)

def test_stream_rows_paginates_and_is_lazy():
    class FakeCur:
        def __init__(self, rows): self.rows, self.pos, self.calls = rows, 0, 0
        def fetchmany(self, size):
            self.calls += 1
            if self.pos >= len(self.rows): return []
            chunk = self.rows[self.pos:self.pos+size]; self.pos += size; return chunk

    data = [{"i": n} for n in range(5)]
    cur = FakeCur(data)
    it = stream_rows(cur, batch=2)
    assert cur.calls == 0                # not called until iteration
    assert next(it) == {"i": 0}          # first batch pulled
    rest = list(it)
    assert rest[-1] == {"i": 4}
    assert cur.calls == 3                # ceil(5/2) = 3 fetches

Trade-offs & pitfalls

  • Pros: Constant memory; backpressure-friendly; easy to compose with for loops.
  • Cons: Errors appear mid-iteration; can only iterate once unless you rebuild the iterator.
  • Pitfalls:
    • Forgetting to close the cursor/connection—use contextlib.closing or manage lifetime outside.
    • Wrapping with list(...) kills streaming (loads all rows).
    • Creating custom __iter__/__next__ incorrectly—must raise StopIteration at the end.

Pythonic alternatives

  • Generators (we used one) and yield from to delegate to inner iterators.
  • itertools: islice, chain.from_iterable, takewhile, etc.
  • async generators for IO-bound paging (HTTP/DB drivers that are async).
  • iter(callable, sentinel) trick (used above) to stop on an empty chunk.
  • Type hints: Iterable[T] for inputs, Iterator[T] for outputs.

Mini exercise

Add stream_rows_until(cur, batch, stop) that stops early when stop(row) -> bool is true (e.g., first row with i >= 10). Test that it doesn’t fetch more pages after the stop condition.

Checks (quick checklist)

  • Yields items lazily from fixed-size chunks.
  • Clean termination when source returns empty.
  • Resource lifetime (cursor/conn) is handled/closed.
  • No accidental list(...) that defeats streaming.
  • Tests prove laziness and correct pagination.