Incremental mapreduce


Ponderings on how to build an efficient mapreduce system that keeps up-to-date (aka reaches consistency eventually) even if the data underneath changes, without recomputing from scratch.

I'm also including a visual overview of mapreduce that may help fellow visual/spatial thinkers understand the topic better.


So Google has their MapReduce , the people behind CouchDB are throwing around their ideas . I spent some time thinking about incremental mapreduce around July 2007, and it's time I type out that page full of scribbles.

First of all: I think the ideas thrown out by Damien above aren't really mapreduce, As Google Intended. The real power of mapreduce is in its inherent combination of parallelism and chainability, output of one mapreduce is input to another, each processing step can run massively in parallel with each other, etc. The proposed design is like a one-iteration retarded cousin of mapreduce.

With that bashing now done (sorry), here's what I was thinking:

The way I imagined building an incremental mapreduce mechanism, without storing the intermediate data and just recomputing chunks that are out-of-date (which would be lame), is to add one extra concept into the system: call it "demap". It will basically create "negative entries" for the old data. This is basically what Damien did by providing both the old and new data map calls, all the time, just said differently, and I think my way might make the average call a lot simpler. And I don't see any reason why my version wouldn't be parallelizable, chainable, and generally yummy.

Visualizing mapreduce

Here's a "whiteboard demo" of plain old mapreduce (not incremental yet). This would be one "step" in the chain I mentioned.

As usual for mapreduce, user defines two functions:

map(in_key, in_val) -> list(out_key, intermediate_val)

reduce(out_key, list(intermediate_val)) -> list(out_value)

Let's run through calling mapreduce with the following input data:

{ K1: S, K2: T, K3: X, K4: Y, K5: Z }



The map function is called for each input value.

For example:

map(K1, S) -> [(A, S1), (B, S2)]

Note how map can return multiple items -- it could also return an empty list.



rotate reorganizes outputs from the map calls, grouping items by key.

Note that sometimes one key can have multiple partitions -- the work is split into bite-size chunks.

rotate is provided by the framework.



reduce is called for each partition.

Once again, reduce can return zero or multiple values. For example:

reduce(A, [S1, T1, X3]) -> [H, I]

Rotate again

rotate again

A new rotate run.

Because reduce can return multiple items, we might not be done yet. We wanted one output value per key from the reduce part -- so keep running it until the data simplifies to that! First, a new rotate.

Reduce again

reduce again

A new reduce run.

A second run of reduce finally reaches a point where each key has only a single item.


The final output for this mapreduce call is:

{ A: N, B: O, C: M }

Making it incremental

So, now that we hopefully have an understanding of mapreduce basics, and a nice notation to express the operations, how does one make mapreduce incremental?

Recomputing the results from scratch every time the input changes is just not a smart idea.

We could store all the intermediate values, and only re-run those calculations where the input changed. With a smart partition algorithm, this might even be doable. But it still means wasting a lot storage space for something you don't really, directly, care about.

So, my idea hinges on this concept: you can only do incremental mapreduce when you know how to "undo" earlier results. If your problem space does not have that property, you're pretty much going to just have to store the intermediate values and try to recompute as little as possible.

However, if you are able to build a function like this:

demap(in_key, in_val) -> list(out_key, out_val)

That looks very much like map, but has one important property: the effect of running your reduce on the output of demap must be semantically comparable to it having never seen the original map output!

If we can accommodate that, here's how an update would go. Let's say we just did the mapreduce run above. Now we mutate the X to X', and want to incrementally update our result. We'd run demap on the old value and map on the new, feeding the results into the mapreduce machinery.

Map and demap

map and demap

map and demap calls.

For example1:

demap(X) -> [(B, X1_de), (C, X2_de), (A, X3_de)]
map(X') -> [(B, X'1), (C, X'2)]



Mix in the previous output and rotate.



And finally reduce to get updated output.


The final output for this mapreduce call is:

{ A: P, B: Q, C: R }

A more concrete example

As a final example, to hopefully make this more concrete, consider Damien's example of computing the number of people known to have each profession. Here are definitions that should work, making updates of that data incremental (I'm assuming all the records are persons):

function map(in_key, in_val) {
      return [(in_val.job, 1)]

function demap(in_key, in_val) {
      return [(in_val.job, -1)]

function reduce(out_key, values) {
  return [sum(values)];

Simple, huh?

For CouchDB, you might think of skipping that in_key argument to map, just using docid, but consider this:

  • that would make chaining mapreduce operations require docids for all the intermediate results

  • the mapreduce operation is already providing output with keys, we want to use those as input to the next thing

  1. I'd use the mathematics-inspired X1-1, but can't due to technical limitations. ↩︎