Pagefy

Pagefy

Stream Processing

Designing Data Intensive Applications by Martin Kleppmann

Chapter 11: Stream Processing

Introduction

In batch processing (Chapter 10), the input is bounded — of a known, finite size. In reality, a lot of data is unbounded — it arrives gradually over time and the dataset is never "complete." Batch processors must artificially divide data into chunks of fixed duration (e.g., processing a day's worth of data at the end of every day).

Stream processing abandons fixed time slices entirely, processing every event as it happens. A "stream" refers to data that is incrementally made available over time.


Section 1: Transmitting Event Streams

In batch processing, inputs and outputs are files. In stream processing, a record is called an event — a small, self-contained, immutable object containing details of something that happened at some point in time (with a timestamp). Related events are grouped into a topic or stream.

1.1 Messaging Systems

A common approach for notifying consumers about new events. Two key questions:

  1. What happens if producers send faster than consumers can process? Three options: drop messages, buffer in a queue, or apply backpressure (flow control — block the sender until the recipient catches up).
  2. What happens if nodes crash? Durability may require writing to disk and/or replication (at a cost to throughput and latency).

Direct Messaging (Producer to Consumer)

  • UDP multicast — Used in financial industry for stock market feeds (low latency). Application-level protocols recover lost packets.
  • Brokerless messaging — ZeroMQ, nanomsg (publish/subscribe over TCP or IP multicast).
  • StatsD, Brubeck — Unreliable UDP messaging for metrics collection.
  • Webhooks — A callback URL registered with another service; HTTP/RPC request on event.

Limitation: generally assume producers and consumers are constantly online. If a consumer is offline, it may miss messages.

Message Brokers

A message broker (message queue) is a kind of database optimized for handling message streams. Producers write to the broker; consumers read from it.

  • Tolerates clients that come and go (connect, disconnect, crash).
  • Consumers are generally asynchronous — producer doesn't wait for message to be processed.

Message Brokers vs Databases

Message BrokersDatabases
Data retentionAuto-delete after successful deliveryKeep until explicitly deleted
Working setAssume short queuesHandle large datasets
QueryingSubscribe to topics matching a patternSecondary indexes, arbitrary queries
NotificationNotify clients when data changesPoint-in-time snapshots; no notification of changes

Traditional brokers: RabbitMQ, ActiveMQ, HornetQ, TIBCO, IBM MQ, Azure Service Bus, Google Cloud Pub/Sub (JMS and AMQP standards).

1.2 Multiple Consumers

  • Load balancing — Each message delivered to one consumer (share the work). AMQP: multiple clients on same queue. JMS: shared subscription.
  • Fan-out — Each message delivered to all consumers (independent broadcast). JMS: topic subscriptions. AMQP: exchange bindings.
  • The two patterns can be combined: two consumer groups each receive all messages, but within each group only one node receives each message.

1.3 Acknowledgments and Redelivery

  • Consumers must explicitly acknowledge when they've finished processing a message. If no acknowledgment (connection closed/timeout), the broker redelivers to another consumer.
  • Ordering problem: With load balancing, redelivery can cause messages to be processed out of order (e.g., m4 before m3). To avoid: use a separate queue per consumer (no load balancing). Not a problem if messages are independent of each other.

1.4 Partitioned Logs

Traditional message brokers delete messages after delivery — destructive consumption. You can't rerun the same consumer and get the same result. A new consumer only receives messages sent after registration.

Log-based message brokers combine durable storage of databases with low-latency notification of messaging.

  • A log is an append-only sequence of records on disk. A producer appends to the end; a consumer reads sequentially.
  • The log is partitioned for throughput. A topic = a group of partitions. Within each partition, the broker assigns a monotonically increasing sequence number (offset) to each message.
  • Implementations: Apache Kafka, Amazon Kinesis Streams, Twitter's DistributedLog. Achieve millions of messages per second via partitioning and replication.

Log-Based vs Traditional Messaging

  • Fan-out: trivial — multiple consumers independently read the log (reading doesn't delete).
  • Load balancing: assign entire partitions to consumer nodes (not individual messages). Each client consumes all messages in its assigned partitions sequentially.
  • Downside: max parallelism = number of partitions. A slow message holds up the entire partition (head-of-line blocking).

Consumer Offsets

  • All messages with offset < current offset have been processed. The broker only needs to periodically record the consumer offset (not track individual acknowledgments).
  • Very similar to the log sequence number in database replication — the message broker is like a leader database, the consumer like a follower.

Disk Space

  • Log divided into segments; old segments deleted or archived → bounded-size buffer (circular buffer / ring buffer).
  • A typical 6 TB hard drive at 150 MB/s sequential write throughput → ~11 hours of buffer. In practice, deployments retain days or weeks of messages.
  • Throughput remains constant regardless of retention (every message is written to disk anyway) — unlike in-memory brokers that slow down when spilling to disk.

Replaying Old Messages

  • Consuming is a read-only operation (doesn't change the log). The consumer offset is under the consumer's control.
  • You can start a copy of a consumer with yesterday's offsets, write output to a different location, and reprocess. Repeatable any number of times.
  • This makes log-based messaging more like batch processing — derived data is clearly separated from input data through a repeatable transformation.

Section 2: Databases and Streams

A replication log is a stream of database write events. The state machine replication principle: if every replica processes the same events in the same order, they all end up in the same final state.

2.1 Keeping Systems in Sync

Most applications combine several technologies (OLTP database, cache, full-text index, data warehouse). They need to be kept in sync.

Dual writes — The application writes to each system directly. Problems:

  • Race conditions — Two clients writing concurrently may arrive in different orders at different systems → permanent inconsistency.
  • Partial failure — One write succeeds, another fails → systems out of sync (atomic commit problem).

Solution: make one database the leader (single source of truth) and make other systems followers via its change stream.

2.2 Change Data Capture (CDC)

The process of observing all data changes written to a database and extracting them as a stream that can be replicated to other systems.

  • Makes one database the leader; turns others into followers. A log-based message broker preserves ordering.
  • Implementations: LinkedIn's Databus, Facebook's Wormhole, Yahoo!'s Sherpa, Bottled Water (PostgreSQL), Maxwell and Debezium (MySQL), Mongoriver (MongoDB), GoldenGate (Oracle).
  • Usually asynchronous — adding a slow consumer doesn't affect the source database, but replication lag issues apply.

Initial Snapshot

If you don't have the entire log history, you need a consistent snapshot (corresponding to a known offset in the change log) before applying changes.

Log Compaction

The storage engine periodically looks for log records with the same key, throws away duplicates, keeps only the most recent update for each key. A tombstone (null value) indicates deletion.

With log compaction, you can rebuild a derived data system by starting a new consumer from offset 0 — the log contains the most recent value for every key in the database (a full copy without needing a snapshot). Supported by Apache Kafka.

API Support for Change Streams

Databases increasingly support change streams as a first-class interface: RethinkDB (query notifications), Firebase and CouchDB (data sync), Meteor (MongoDB oplog), VoltDB (stream of committed tuples), Kafka Connect (integrates CDC tools with Kafka).

2.3 Event Sourcing

Similar to CDC but at a different level of abstraction:

  • CDC: Application uses the database mutably. Changes extracted at a low level (parsing replication log). Application doesn't need to be aware of CDC.
  • Event sourcing: Application logic explicitly built on immutable events written to an event log. Events reflect user actions at the application level, not low-level state changes. Updates/deletes are discouraged.

Event sourcing makes it easier to evolve applications, helps with debugging, and guards against application bugs.

Commands vs Events

  • A command is a user request that may still fail (e.g., validation may reject it).
  • Once validation succeeds and the command is accepted, it becomes an event — durable and immutable.
  • A consumer of the event stream cannot reject an event. Validation must happen synchronously before the event is written.

2.4 State, Streams, and Immutability

Mutable state and an append-only log of immutable events are two sides of the same coin:

  • Application state = integral of the event stream over time.
  • Change stream = derivative of the state by time.

Pat Helland: "The truth is the log. The database is a cache of a subset of the log."

Advantages of Immutable Events

  • Like financial bookkeeping — mistakes are corrected by adding compensating transactions, not erasing the original.
  • If buggy code writes bad data, recovery is much easier with an append-only log than with destructive overwrites.
  • Captures more information (e.g., a customer adding then removing an item from their cart — useful for analytics even though the net effect is zero).

Deriving Several Views from the Same Event Log (CQRS)

  • Separate the form in which data is written (event log) from the form it is read (application state).
  • Command Query Responsibility Segregation (CQRS) — Multiple read-optimized views derived from the same write-optimized event log.
  • Debates about normalization vs denormalization become largely irrelevant — you can denormalize in read views while keeping the event log as the source of truth.

Concurrency Control

  • Consumers of the event log are usually asynchronous → a user may write to the log and then read from a derived view that hasn't been updated yet (read-your-writes problem).
  • If the event log and application state are partitioned the same way, a single-threaded log consumer needs no concurrency control for writes.

Limitations of Immutability

  • Workloads with high update/delete rates → immutable history may grow prohibitively large.
  • Privacy regulations may require actually deleting data (not just appending a "deleted" event). This is genuinely hard with immutable logs.

Section 3: Processing Streams

Three options for what to do with a stream:

  1. Write to a storage system (database, cache, search index) — streaming equivalent of batch workflow output.
  2. Push to users (email alerts, push notifications, real-time dashboard).
  3. Process one or more input streams to produce output streams — a pipeline of processing stages.

A piece of code that processes streams is called an operator or job. The crucial difference from batch: a stream never ends.

3.1 Uses of Stream Processing

Complex Event Processing (CEP)

  • Specify rules to search for certain patterns of events in a stream (like regex for events).
  • Uses a high-level declarative query language (like SQL) or graphical UI.
  • The relationship between queries and data is reversed compared to databases: queries are stored long-term, data flows through them.
  • Implementations: Esper, IBM InfoSphere Streams, Apama, TIBCO StreamBase, SQLstream.

Stream Analytics

  • Oriented toward aggregations and statistical metrics over large numbers of events (not specific event sequences).
  • Examples: rate of events per time interval, rolling averages, comparing current statistics to previous time intervals.
  • Computed over fixed time intervals called windows.
  • May use probabilistic algorithms: Bloom filters (set membership), HyperLogLog (cardinality estimation), percentile estimation — approximate results with less memory.
  • Frameworks: Apache Storm, Spark Streaming, Flink, Concord, Samza, Kafka Streams. Hosted: Google Cloud Dataflow, Azure Stream Analytics.

Maintaining Materialized Views

  • Keep derived data systems (caches, search indexes, data warehouses) up to date by consuming the change stream.
  • Unlike stream analytics, may require all events over an arbitrary time period (not just a time window) — a window stretching back to the beginning of time.
  • In principle, any stream processor could be used for materialized view maintenance, but the need to maintain events forever runs counter to the assumptions of some analytics-oriented frameworks that mostly operate on windows of limited duration.
  • Samza and Kafka Streams support this kind of usage, building on Kafka's log compaction. Samza is particularly well-suited because it integrates tightly with Kafka: it uses Kafka topics for both input streams and for replicating local state changes, enabling fault-tolerant local state that can be rebuilt from the log-compacted changelog.

Search on Streams

  • Store queries long-term, run documents past them (the reverse of conventional search).
  • Elasticsearch's percolator feature implements this.

Message Passing and RPC

  • Actor frameworks are primarily for concurrency management; stream processing is primarily for data management.
  • Actors: ephemeral, one-to-one communication, arbitrary patterns (including cyclic). Streams: durable, multi-subscriber, acyclic pipelines.

3.2 Reasoning About Time

Event Time vs Processing Time

  • Event time — When the event actually occurred (timestamp embedded in the event).
  • Processing time — When the stream processor processes the event (local system clock).
  • Processing may be delayed (queueing, network faults, restarts, reprocessing). Messages may arrive out of order.
  • Confusing event time and processing time leads to bad data (e.g., a spike in event rate appears when reprocessing a backlog, even though the original events were spread over time).

Straggler Events

  • You can never be sure when you've received all events for a particular window.
  • Two options: ignore stragglers (track dropped events as a metric), or publish a correction (updated value with stragglers included).

Whose Clock Are You Using?

  • Mobile apps may buffer events offline and send them hours or days later.
  • Device clocks can't be trusted (may be set incorrectly).
  • Three timestamps approach:
    1. Time the event occurred (device clock)
    2. Time the event was sent to the server (device clock)
    3. Time the event was received by the server (server clock)
  • Subtract timestamp 2 from timestamp 3 to estimate the device clock offset, then apply to timestamp 1.

3.3 Types of Windows

Window TypeDescription
TumblingFixed length, non-overlapping. Every event belongs to exactly one window. E.g., 1-minute windows: 10:03:00–10:03:59, 10:04:00–10:04:59.
HoppingFixed length, overlapping (provides smoothing). E.g., 5-minute window with 1-minute hop: 10:03:00–10:07:59, 10:04:00–10:08:59. Implemented as tumbling windows + aggregation over adjacent windows.
SlidingContains all events within some interval of each other. E.g., 5-minute sliding window includes events at 10:03:39 and 10:08:12 (< 5 min apart). Implemented with a sorted buffer, removing expired events.
SessionNo fixed duration. Groups all events for the same user that occur closely together in time. Ends when user is inactive for some period (e.g., 30 minutes). Common for website analytics.

3.4 Stream Joins

Three types of joins in stream processing:

Stream-Stream Join (Window Join)

  • Example: correlating search events with click events (same session ID) to calculate click-through rates.
  • The stream processor maintains state (e.g., all events in the last hour, indexed by session ID). When a new event arrives, check the other index for a matching event.
  • If the search expires without a matching click, emit an event saying the search was not clicked.

Stream-Table Join (Stream Enrichment)

  • Example: enriching activity events with user profile information (user ID → profile data).
  • Load a copy of the database into the stream processor (in-memory hash table or local disk index) for local lookups without network round-trips.
  • Keep the local copy up to date via change data capture — subscribe to the user profile database's changelog.
  • Like a stream-stream join, but the table changelog uses a window reaching back to the "beginning of time" (newer records overwrite older ones).

Table-Table Join (Materialized View Maintenance)

  • Example: Twitter timeline cache. When a new tweet is sent, add it to all followers' timelines. When a user unfollows someone, remove their tweets.
  • The stream processor maintains a database of followers for each user.
  • Equivalent to maintaining a materialized view of a SQL join between tweets and follows tables.

Time-Dependence of Joins

  • All three join types require the stream processor to maintain state based on one join input and query it on messages from the other.
  • If state changes over time, which version do you use for the join? For example, if you sell things, you need to apply the right tax rate to invoices, which depends on the country, product type, and date of sale (since tax rates change). When joining sales to a table of tax rates, you want the rate at the time of the sale, which may differ from the current rate if reprocessing historical data.
  • If ordering across streams is undetermined, the join becomes nondeterministic — you cannot rerun the same job on the same input and necessarily get the same result (events on input streams may be interleaved differently).
  • In data warehouses, this is called a slowly changing dimension (SCD). It is often addressed by using a unique identifier for each version of the joined record: every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale. This makes the join deterministic, but has the consequence that log compaction is not possible (all versions of the records must be retained).

3.5 Fault Tolerance

Batch processing: if a task fails, restart it on the same immutable input. Output only visible when task completes successfully → appears as if every record was processed exactly once (better described as effectively-once).

Stream processing: the stream never ends, so you can't simply restart from the beginning.

Microbatching

  • Break the stream into small blocks (~1 second), treat each as a miniature batch process. Used by Spark Streaming.
  • Implicitly provides a tumbling window equal to the batch size.
  • Trade-off: smaller batches = more scheduling overhead; larger batches = more delay.

Checkpointing

  • Periodically generate rolling checkpoints of state, write to durable storage. On crash, restart from the most recent checkpoint. Used by Apache Flink.
  • Checkpoints triggered by barriers in the message stream (similar to microbatch boundaries but without forcing a particular window size).

Limitation

Both microbatching and checkpointing provide exactly-once semantics within the stream processing framework. But once output leaves the framework (writing to a database, sending emails), a failed task restart causes the external side effect to happen twice.

Atomic Commit

  • Ensure all outputs and side effects take effect if and only if processing is successful (messages to downstream operators, database writes, state changes, input acknowledgments).
  • Used in Google Cloud Dataflow, VoltDB, planned for Apache Kafka.
  • Unlike XA, these keep transactions internal to the stream processing framework (not across heterogeneous technologies). Overhead amortized by processing several messages per transaction.

Idempotence

  • An idempotent operation has the same effect whether performed once or multiple times (e.g., setting a key to a fixed value is idempotent; incrementing a counter is not).
  • Even non-idempotent operations can be made idempotent with extra metadata (e.g., include the Kafka message offset with each database write to detect duplicate updates).
  • Requires: same messages replayed in same order (log-based broker), deterministic processing, no concurrent updates to the same value.

Rebuilding State After a Failure

  • Replicate state to remote storage (Flink: periodic snapshots to HDFS; Samza/Kafka Streams: state changes to a dedicated Kafka topic with log compaction; VoltDB: redundant processing on multiple nodes).
  • Rebuild from input streams — If state is aggregations over a short window, replay the input events. If state is a local database replica, rebuild from the log-compacted change stream.

Summary

Key Takeaways

Two Types of Message Brokers:

AMQP/JMS-styleLog-based (Kafka, Kinesis)
DeliveryIndividual messages to consumersAll messages in a partition to one consumer
AcknowledgmentPer-messageConsumer offset checkpoint
After deliveryMessage deletedMessage retained on disk
OrderingMay be reordered with load balancingPreserved within partition
ReplayNot possible (destructive)Possible (read-only, offset manipulation)
Best forTask queues, message-by-message parallelismHigh throughput, ordering important, replayability

Where Streams Come From:

  • User activity events, sensors, data feeds
  • Database changelogs via change data capture (CDC) or event sourcing
  • Log compaction allows the stream to retain a full copy of the database

Processing Streams:

  • CEP — Pattern matching on event sequences
  • Stream analytics — Windowed aggregations and statistics
  • Materialized views — Keeping derived data systems up to date
  • Stream joins — Stream-stream (window), stream-table (enrichment), table-table (materialized view maintenance)

Reasoning About Time:

  • Event time ≠ processing time. Use event timestamps, handle stragglers.
  • Three-timestamp approach for untrusted device clocks.
  • Four window types: tumbling, hopping, sliding, session.

Fault Tolerance:

  • Microbatching (Spark Streaming), checkpointing (Flink), atomic commit, idempotence.
  • Exactly-once / effectively-once semantics achievable within the framework; external side effects require atomic commit or idempotence.

Important References

  1. Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: "The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing," Proceedings of the VLDB Endowment, volume 8, number 12, pages 1792–1803, August 2015.
  2. Jay Kreps, Neha Narkhede, and Jun Rao: "Kafka: A Distributed Messaging System for Log Processing," at 6th NetDB, June 2011.
  3. Jay Kreps: "The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction," engineering.linkedin.com, December 16, 2013.
  4. Pat Helland: "Immutability Changes Everything," at 7th CIDR, January 2015.
  5. Martin Kleppmann: "Bottled Water: Real-Time Integration of PostgreSQL and Kafka," martin.kleppmann.com, April 23, 2015.
  6. Martin Kleppmann: Making Sense of Stream Processing, O'Reilly Media, May 2016.
  7. Greg Young: "CQRS and Event Sourcing," at Code on the Beach, August 2014.
  8. Martin Fowler: "Event Sourcing," martinfowler.com, December 12, 2005.
  9. Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: "Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters," at 4th USENIX HotCloud, June 2012.
  10. Kostas Tzoumas, Stephan Ewen, and Robert Metzger: "High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink," data-artisans.com, August 5, 2015.
  11. Pat Helland: "Idempotence Is Not a Medical Condition," Communications of the ACM, volume 55, number 5, page 56, May 2012.
  12. Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: "All Aboard the Databus!," at 3rd ACM SoCC, October 2012.
  13. Neha Narkhede: "Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines," confluent.io, February 18, 2016.
  14. Tyler Akidau: "The World Beyond Batch: Streaming 102," oreilly.com, January 20, 2016.
  15. Jay Kreps: "Why Local State Is a Fundamental Primitive in Stream Processing," oreilly.com, July 31, 2014.