Batch Processing
Designing Data Intensive Applications by Martin KleppmannChapter 10: Batch Processing
Introduction
Three different types of systems:
| Type | Description | Performance Measure |
|---|---|---|
| Services (online) | Waits for a request, handles it quickly, sends a response | Response time, availability |
| Batch processing (offline) | Takes a large amount of input data, runs a job, produces output. No user waiting. Often scheduled periodically | Throughput (time to crunch a dataset of a certain size) |
| Stream processing (near-real-time) | Consumes inputs, produces outputs (like batch), but operates on events shortly after they happen (lower latency than batch) | Latency |
MapReduce, published in 2004, was a major step forward in the scale of processing achievable on commodity hardware. Although its importance is now declining, it provides a clear picture of why and how batch processing is useful.
Section 1: Batch Processing with Unix Tools
Simple Log Analysis
Finding the five most popular pages on a website using Unix tools:
cat /var/log/nginx/access.log |
awk '{print $7}' | # Extract the URL (7th field)
sort | # Alphabetically sort URLs
uniq -c | # Count adjacent duplicates
sort -r -n | # Sort by count, descending
head -n 5 # Take top 5
This processes gigabytes of log files in seconds. Surprisingly many data analyses can be done in minutes using awk, sed, grep, sort, uniq, and xargs.
Sorting vs In-Memory Aggregation
- In-memory hash table (like a Ruby/Python script) — Works well if the working set (distinct URLs) fits in memory.
- Sorting approach (Unix
sort) — Handles larger-than-memory datasets by spilling to disk (mergesort with sequential access patterns). GNU Coreutilssortautomatically handles this and parallelizes across multiple CPU cores.
Section 2: The Unix Philosophy
Doug McIlroy (inventor of Unix pipes, 1964): "We should have some ways of connecting programs like a garden hose."
Four Principles (1978)
- Make each program do one thing well. Build afresh rather than complicating old programs.
- Expect the output of every program to become the input to another. Don't clutter output. Avoid binary input formats. Don't insist on interactive input.
- Design and build software to be tried early (within weeks). Don't hesitate to throw away clumsy parts.
- Use tools in preference to unskilled help to lighten a programming task.
This sounds remarkably like Agile and DevOps today.
Uniform Interface
- In Unix, the interface is a file (file descriptor) — an ordered sequence of bytes.
- Many things share this interface: actual files, communication channels (Unix sockets, stdin, stdout), device drivers, TCP connections.
- By convention, many Unix programs treat input as ASCII text with
\nas record separator. - The parsing of each record is more vague: Unix tools commonly split a line into fields by whitespace or tab characters, but CSV, pipe-separated, and other encodings are also used. Even a fairly simple tool like
xargshas half a dozen command-line options for specifying how its input should be parsed. - The uniform interface of ASCII text mostly works, but it's not exactly beautiful:
{print $7}to extract the URL is not very readable. In an ideal world it could have been{print $request_url}.
Separation of Logic and Wiring
- Programs use stdin and stdout by default. Pipes connect stdout of one process to stdin of another.
- The program doesn't know or care where input comes from or output goes to — a form of loose coupling / inversion of control.
Transparency and Experimentation
- Input files are treated as immutable — run commands as often as you want without damaging input.
- You can pipe output into
lessat any point to inspect intermediate results. - You can write intermediate output to a file and restart later stages without rerunning the entire pipeline.
Biggest limitation: Unix tools run only on a single machine — that's where Hadoop comes in.
Section 3: MapReduce and Distributed Filesystems
MapReduce is like Unix tools, but distributed across potentially thousands of machines. Like Unix tools, it does not modify the input and has no side effects other than producing output.
HDFS (Hadoop Distributed File System)
- Open source reimplementation of Google File System (GFS).
- Based on the shared-nothing principle (no special hardware, just computers connected by a conventional network).
- A daemon process runs on each machine; a central NameNode tracks which file blocks are stored where.
- File blocks are replicated on multiple machines (or use erasure coding like Reed-Solomon codes).
- Biggest deployments: tens of thousands of machines, hundreds of petabytes.
MapReduce Job Execution
Four steps (analogous to the Unix pipeline):
- Read input files, break into records (e.g.,
\nas separator). - Call the mapper to extract a key and value from each input record.
- Sort all key-value pairs by key (implicit in MapReduce).
- Call the reducer to iterate over sorted key-value pairs with the same key.
Mapper — Called once per input record. Extracts key-value pairs. Stateless (each record handled independently). Reducer — Receives all values for the same key. Produces output records.
Distributed Execution
- Input is partitioned by file blocks; each block processed by a separate map task.
- The scheduler tries to run each mapper on the machine that stores the input file replica (putting the computation near the data).
- Reducer partitioning uses a hash of the key to determine which reducer receives each key-value pair.
- Mappers write sorted output to local disk, partitioned by reducer. Reducers fetch and merge these sorted files. This process is called the shuffle.
- Reducer output is written to the distributed filesystem (replicated).
Workflows
- A single MapReduce job is limited. Complex analyses require chaining jobs — output of one becomes input to the next.
- Chaining is done implicitly by directory name (not piped like Unix).
- Workflow schedulers: Oozie, Azkaban, Luigi, Airflow, Pinball.
- Higher-level tools: Pig, Hive, Cascading, Crunch, FlumeJava — set up multi-stage workflows automatically.
3.1 Reduce-Side Joins and Grouping
Sort-Merge Joins
- One set of mappers extracts user ID from activity events; another extracts user ID from the user database.
- MapReduce partitions by key and sorts → all activity events and the user record with the same user ID become adjacent in the reducer input.
- Secondary sort — Arrange records so the reducer sees the user database record first, followed by activity events in timestamp order.
- The reducer stores the user's date of birth in a local variable, then iterates over activity events → outputs
(viewed-url, viewer-age-in-years)pairs. - Only needs to keep one user record in memory at a time. No network requests needed.
GROUP BY and Sessionization
- Grouping records by key works the same way as joins in MapReduce.
- Sessionization — Collating all activity events for a particular user session (e.g., for A/B testing). Use session cookie or user ID as the grouping key.
Handling Skew (Hot Keys)
- A celebrity with millions of followers → one reducer processes significantly more records than others → hot spot.
- Pig's skewed join — Sampling job identifies hot keys. Hot key records sent to random reducers (not deterministic hash). The other join input is replicated to all reducers handling that key.
- Crunch's sharded join — Similar but hot keys specified explicitly.
- Hive's skewed join — Hot keys stored in separate files; uses a map-side join for hot keys.
- Two-stage aggregation — First stage: random reducers aggregate subsets. Second stage: combine first-stage results into final value per key.
3.2 Map-Side Joins
Reduce-side joins make no assumptions about input data but are expensive (sorting, copying, merging). If you can make assumptions about the input, map-side joins are faster — no reducers, no sorting.
Broadcast Hash Joins
- One input is small enough to fit in memory. Each mapper loads the small dataset into an in-memory hash table, then scans the large input and looks up each record.
- The small input is "broadcast" to all partitions of the large input.
- Supported by: Pig ("replicated join"), Hive ("MapJoin"), Cascading, Crunch, Impala.
Partitioned Hash Joins
- Both inputs are partitioned the same way (same key, same hash function, same number of partitions).
- Each mapper only loads one partition of the small input into its hash table.
- Known as bucketed map joins in Hive.
Map-Side Merge Joins
- Both inputs are partitioned the same way and sorted by the same key.
- Mapper reads both input files incrementally in ascending key order, matching records with the same key (like a reducer's merge operation).
3.3 The Output of Batch Workflows
Building Search Indexes
- Google's original use of MapReduce. Mappers partition documents; each reducer builds the index for its partition (term dictionary → postings list). Index files are immutable once written.
- Can periodically rerun the entire indexing workflow, or build indexes incrementally (Lucene's segment merging).
Key-Value Stores as Batch Process Output
- Build a brand-new database inside the batch job and write it as files to the distributed filesystem. Then load in bulk into read-only servers.
- Don't write directly to a database from mappers/reducers (poor performance, can overwhelm the database, loses all-or-nothing guarantee).
- Used by: Voldemort, Terrapin, ElephantDB, HBase bulk loading.
Philosophy of Batch Process Outputs
Like Unix philosophy — inputs are immutable, outputs replace previous outputs, no side effects:
- Roll back easily — If buggy code produces wrong output, revert code and rerun. Old output still available in a different directory (human fault tolerance).
- Minimizing irreversibility — Beneficial for Agile development.
- Automatic retry — Failed map/reduce tasks are re-scheduled on the same input (safe because inputs are immutable, failed output is discarded).
- Reuse — Same input files can be used by various different jobs.
- Separation of concerns — Logic separated from wiring (input/output directories).
3.4 Comparing Hadoop to Distributed Databases
| MPP Databases | Hadoop/MapReduce | |
|---|---|---|
| Storage | Requires structured data (relational/document model) | Files are just byte sequences — any data model and encoding |
| Processing | SQL queries only | Arbitrary code (MapReduce, ML, image analysis, NLP) |
| Data modeling | Careful up-front modeling required | Dump data first, figure out processing later ("data lake", "sushi principle: raw data is better") |
| Fault handling | Abort entire query on node failure; resubmit | Retry individual failed tasks; write intermediate state to disk |
| Design for | Short queries (seconds to minutes) | Long jobs likely to experience task failures; preemption in shared clusters |
Designing for Frequent Faults
- At Google, a MapReduce task running for an hour has ~5% risk of being preempted (terminated to make space for higher-priority processes) — more than an order of magnitude higher than hardware failure rate.
- Google has mixed-use datacenters where online production services and offline batch jobs run on the same machines. Every task has a resource allocation (CPU, RAM, disk) enforced using containers, and a priority. Higher-priority processes cost more; lower-priority tasks can be terminated to free resources.
- This architecture allows non-production computing resources to be overcommitted — the system knows it can reclaim resources if necessary. Overcommitting allows better utilization and greater efficiency. Batch jobs effectively "pick up the scraps under the table" using whatever computing resources remain after high-priority processes have taken what they need.
- At this preemption rate, if a job has 100 tasks each running for 10 minutes, there is a >50% risk that at least one task will be terminated before finishing.
- MapReduce is designed to tolerate frequent unexpected task termination — not because hardware is unreliable, but because the freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster (Google's Borg scheduler).
- In open source schedulers (YARN, Mesos, Kubernetes), preemption is less widely used, so MapReduce's design decisions make less sense.
Section 4: Beyond MapReduce
MapReduce is robust (processes arbitrarily large data on unreliable multi-tenant systems) but other tools are sometimes orders of magnitude faster for some kinds of processing.
4.1 Materialization of Intermediate State
Every MapReduce job is independent. Output of one job becomes input to the next via files on the distributed filesystem. This intermediate state is fully written to files — called materialization.
Unix pipes, by contrast, stream output to input incrementally using a small in-memory buffer.
Downsides of Materialization
- A job can only start when all tasks in the preceding job have completed (straggler tasks slow everything down).
- Mappers are often redundant — they just read back what a reducer just wrote and prepare it for the next stage. Could be chained directly.
- Intermediate state is replicated across several nodes in HDFS — overkill for temporary data.
4.2 Dataflow Engines
To fix MapReduce's problems: Spark, Tez, and Flink. They handle an entire workflow as one job rather than breaking it into independent subjobs. Known as dataflow engines because they explicitly model the flow of data through processing stages.
Operators (Generalized Map and Reduce)
Functions called operators can be assembled in flexible ways (not just alternating map and reduce). Three options for connecting operators:
- Repartition and sort by key (like MapReduce shuffle) — enables sort-merge joins and grouping.
- Repartition without sorting — for partitioned hash joins (order irrelevant since hash table randomizes it).
- Broadcast — Send same output to all partitions of the join operator (for broadcast hash joins).
Advantages Over MapReduce
- Sorting only where actually required (not by default between every stage).
- No unnecessary map tasks (mapper work incorporated into preceding reduce operator).
- Locality optimizations — Scheduler can place consuming task on the same machine as producing task (shared memory buffer instead of network copy).
- Intermediate state kept in memory or local disk (not replicated to HDFS).
- Operators start executing as soon as input is ready (no waiting for entire preceding stage).
- JVM reuse — Existing processes run new operators (no new JVM per task like MapReduce).
Workflows in Pig, Hive, or Cascading can be switched from MapReduce to Tez or Spark with a simple configuration change.
4.3 Fault Tolerance in Dataflow Engines
- MapReduce: intermediate state on HDFS is durable → easy to restart failed tasks.
- Spark/Flink/Tez: avoid writing intermediate state to HDFS → if a machine fails, recompute from other available data (prior stage or original HDFS input).
- Spark uses Resilient Distributed Datasets (RDD) to track the ancestry of data (which input partitions and operators produced it).
- Flink uses checkpointing of operator state to resume after a fault.
- Determinism matters — If recomputed data differs from the original lost data, downstream operators face contradictions. Nondeterministic operators (hash table iteration order, random numbers, system clock) must be eliminated for reliable recovery.
- If intermediate data is much smaller than source data or computation is very CPU-intensive, it may be cheaper to materialize than to recompute.
4.4 Graphs and Iterative Processing
Graph algorithms (e.g., PageRank) require repeating until done — cannot be expressed in a single MapReduce pass. Iterative approach: run a batch process per step, check completion condition, repeat. MapReduce is very inefficient for this (reads entire input and produces entirely new output each iteration, even if only a small part changed).
The Pregel Model (Bulk Synchronous Parallel — BSP)
- Implemented by: Apache Giraph, Spark's GraphX, Flink's Gelly.
- One vertex can "send a message" to another vertex (typically along edges in the graph).
- In each iteration, a function is called for each vertex, passing all messages sent to it.
- Unlike MapReduce, a vertex remembers its state in memory from one iteration to the next — only processes new incoming messages.
- Similar to the actor model, but with fault-tolerant durable state and fixed-round communication.
- Fault tolerance via periodic checkpointing of all vertex state. On failure, roll back to last checkpoint.
Parallel Execution
- Vertices send messages to vertex IDs; the framework handles partitioning and routing.
- Graph algorithms often have a lot of cross-machine communication overhead.
- If the graph fits in memory on a single computer, a single-machine algorithm will likely outperform a distributed batch process. Even single-machine frameworks like GraphChi can handle graphs that fit on disk.
4.5 High-Level APIs and Languages
Higher-level APIs (Hive, Pig, Cascading, Crunch, Spark, Flink) use relational-style building blocks: joining on field values, grouping by key, filtering, aggregating (count, sum).
The Move Toward Declarative Query Languages
- Specifying joins as relational operators lets the framework automatically choose the best join algorithm (cost-based query optimizers in Hive, Spark, Flink).
- Simple filtering/mapping expressed declaratively allows the query optimizer to use column-oriented storage layouts, read only required columns, and use vectorized execution (tight inner loops friendly to CPU caches).
- Spark generates JVM bytecode; Impala uses LLVM to generate native code for inner loops.
- Batch processing frameworks are beginning to look more like MPP databases (comparable performance) while retaining the flexibility of running arbitrary code and reading arbitrary formats.
Specialization for Different Domains
- Machine learning: Mahout (on MapReduce/Spark/Flink), MADlib (inside relational MPP database Apache HAWQ).
- Spatial algorithms: k-nearest neighbors for similarity search.
- Genome analysis: approximate string matching.
Summary
Key Takeaways
Design Principles (from Unix to MapReduce):
- Inputs are immutable, outputs are intended to become input to another program.
- Complex problems solved by composing small tools that "do one thing well".
- In Unix: files and pipes. In MapReduce: distributed filesystem. In dataflow engines: pipe-like data transport.
Two Main Problems Solved by Distributed Batch Processing:
| Problem | MapReduce | Dataflow Engines (Spark/Tez/Flink) |
|---|---|---|
| Partitioning | Mappers partitioned by input file blocks; output repartitioned, sorted, merged into reducer partitions | Similar partitioning, but sorting only where required |
| Fault tolerance | Writes frequently to disk; easy to recover individual tasks but slower in failure-free case | Keeps more in memory; recomputes on failure (needs deterministic operators) |
Join Algorithms:
- Sort-merge joins (reduce-side) — Both inputs go through mappers extracting the join key. Partitioning, sorting, and merging bring all records with the same key to the same reducer.
- Broadcast hash joins (map-side) — Small input loaded entirely into hash table in each mapper. Large input scanned and looked up.
- Partitioned hash joins (map-side) — Both inputs partitioned the same way. Each mapper loads one partition of the small input.
Batch Processing Output:
- Build search indexes, key-value stores, ML models.
- Build new database files inside the batch job → load in bulk to read-only servers.
- Never write directly to a production database from mappers/reducers.
Beyond MapReduce:
- Dataflow engines (Spark, Tez, Flink) handle entire workflows as one job, avoid materializing intermediate state, and are often orders of magnitude faster.
- Graph processing uses the Pregel/BSP model (vertex-centric, message-passing, iterative).
- High-level declarative APIs with cost-based query optimizers are converging batch processing frameworks with MPP databases.
Key distinction: Batch processing input is bounded (known, fixed size). The job eventually completes. Stream processing (Chapter 11) handles unbounded input — the job is never complete.
Important References
- Jeffrey Dean and Sanjay Ghemawat: "MapReduce: Simplified Data Processing on Large Clusters," at 6th USENIX OSDI, December 2004.
- Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung: "The Google File System," at 19th ACM SOSP, October 2003.
- Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.: "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," at 9th USENIX NSDI, April 2012.
- Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, et al.: "Pregel: A System for Large-Scale Graph Processing," at ACM SIGMOD, June 2010.
- Lawrence Page, Sergey Brin, Rajeev Motwani, and Terry Winograd: "The PageRank Citation Ranking: Bringing Order to the Web," Stanford InfoLab Technical Report 422, 1999.
- Craig Chambers, Ashish Raniwala, Frances Perry, et al.: "FlumeJava: Easy, Efficient Data-Parallel Pipelines," at 31st ACM SIGPLAN PLDI, June 2010.
- Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.: "Hive – A Petabyte Scale Data Warehouse Using Hadoop," at 26th IEEE ICDE, March 2010.
- Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.: "Large-Scale Cluster Management at Google with Borg," at 10th EuroSys, April 2015.
- Roshan Sumbaly, Jay Kreps, Lei Gao, et al.: "Serving Large-Scale Batch Computed Data with Project Voldemort," at 10th USENIX FAST, February 2012.
- Tom White: Hadoop: The Definitive Guide, 4th edition, O'Reilly Media, 2015.
- Eric S. Raymond: The Art of UNIX Programming, Addison-Wesley, 2003.
- Marcel Kornacker, Alexander Behm, Victor Bittorf, et al.: "Impala: A Modern, Open-Source SQL Engine for Hadoop," at 7th CIDR, January 2015.
- David J. DeWitt and Jim N. Gray: "Parallel Database Systems: The Future of High Performance Database Systems," Communications of the ACM, volume 35, number 6, pages 85–98, June 1992.
- Frank McSherry, Michael Isard, and Derek G. Murray: "Scalability! But at What COST?," at 15th USENIX HotOS, May 2015.
Previous chapter
Consistency and ConsensusNext chapter
Stream Processing