crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <>
Subject thorny Aggregator-on-Spark design problem
Date Thu, 05 Dec 2013 18:31:56 GMT
Hey all,

So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've run
into a place where there's a mismatch between the frameworks: combiners,
and how Crunch uses Aggregators to express combine operations.

The current Crunch Aggregator<T> assumes that it will see all of the values
for a given key all at once, because that's how things work in MapReduce
Combiner and Reducer operations. That isn't true in Spark; all of Spark's
aggregations are hash based, so when you do a combineByKey() operation in
Spark, Spark creates a HashMap and then creates and updates combiner
instance by defining three functions:

createCombiner: V => C (takes in a value, returns a combiner object)
mergeValue: (V, C) => C (takes in a new value and an existing combiner and
updates the combiner)
mergeCombiners: (C, C) => C (take in two combiners and merge them together)

I could do a hack that would make Aggregators usable in the way that Spark
expects them to be used-- after all, Aggregator<T> implements Serializable,
so there's no issue with serializing Aggregators across the wire in either
Spark or MapReduce by using a PTypes.serializables() with the PTypeFamily
of the key. The requirement would be that Aggregators would need to be
Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me that
was evil), because certain Aggregators have state associated with them
(e.g., string concat) that would need to be passed along (this sort of
vaguely recalls the AggregatorFactory error I made many moons ago.) What I
would probably end up with is a default impl that did a
serialize/deserialize to create a new instance of the Aggregator that
subclasses that knew better could override to work optimally.

That said, that's not the greatest thing ever, and so I'm wondering if
anyone has thought about what a generalization of aggregator would look
like. I am even open to the use of terms like "monoid" if you feel like
there's no other way to express your ideas. ;-)

Director of Data Science
Cloudera <>
Twitter: @josh_wills <>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message