Iterator (stream data in small chunks)
When to use
- You need to process lots of rows without loading all into memory.
- Your source is paged (DB cursor
fetchmany, API pages, S3 list). - You want lazy processing (work happens as you loop).
Avoid when the dataset is tiny or you truly need a full in-memory list.
Diagram (text)
Client ── for row in stream_rows(...):
Iterator/generator ── pulls next chunk → yields rows one by one
Data source (DB/API) ──> fetchmany(page_size) until empty
Step-by-step idea
- Define how to get the next chunk (e.g.,
cursor.fetchmany(n)). - The iterator yields items from each chunk.
- Stops when the source returns empty.
Python example (≤40 lines, type-hinted)
Concrete: stream rows from a DB-like cursor using fetchmany, constant memory.
from __future__ import annotations
from typing import Protocol, Iterator, Sequence, TypedDict
class Row(TypedDict):
i: int
class Cursor(Protocol):
def fetchmany(self, size: int) -> Sequence[Row]: ...
def stream_rows(cur: Cursor, batch: int = 1000) -> Iterator[Row]:
# Calls cur.fetchmany(batch) repeatedly until it returns an empty list.
for chunk in iter(lambda: cur.fetchmany(batch), []):
for row in chunk:
yield row
Tiny pytest (cements it)
def test_stream_rows_paginates_and_is_lazy():
class FakeCur:
def __init__(self, rows): self.rows, self.pos, self.calls = rows, 0, 0
def fetchmany(self, size):
self.calls += 1
if self.pos >= len(self.rows): return []
chunk = self.rows[self.pos:self.pos+size]; self.pos += size; return chunk
data = [{"i": n} for n in range(5)]
cur = FakeCur(data)
it = stream_rows(cur, batch=2)
assert cur.calls == 0 # not called until iteration
assert next(it) == {"i": 0} # first batch pulled
rest = list(it)
assert rest[-1] == {"i": 4}
assert cur.calls == 3 # ceil(5/2) = 3 fetches
Trade-offs & pitfalls
- Pros: Constant memory; backpressure-friendly; easy to compose with
forloops. - Cons: Errors appear mid-iteration; can only iterate once unless you rebuild the iterator.
- Pitfalls:
- Forgetting to close the cursor/connection—use
contextlib.closingor manage lifetime outside. - Wrapping with
list(...)kills streaming (loads all rows). - Creating custom
__iter__/__next__incorrectly—must raiseStopIterationat the end.
- Forgetting to close the cursor/connection—use
Pythonic alternatives
- Generators (we used one) and
yield fromto delegate to inner iterators. itertools:islice,chain.from_iterable,takewhile, etc.asyncgenerators for IO-bound paging (HTTP/DB drivers that are async).iter(callable, sentinel)trick (used above) to stop on an empty chunk.- Type hints:
Iterable[T]for inputs,Iterator[T]for outputs.
Mini exercise
Add stream_rows_until(cur, batch, stop) that stops early when stop(row) -> bool is true (e.g., first row with i >= 10). Test that it doesn’t fetch more pages after the stop condition.
Checks (quick checklist)
- Yields items lazily from fixed-size chunks.
- Clean termination when source returns empty.
- Resource lifetime (cursor/conn) is handled/closed.
- No accidental
list(...)that defeats streaming. - Tests prove laziness and correct pagination.




