crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <>
Subject Re: map side ("replicated") joins in Crunch
Date Fri, 15 Jun 2012 18:19:28 GMT
On Fri, Jun 15, 2012 at 8:47 AM, Joseph Adler <> wrote:
> Hi everyone,
> Separate topic, so a separate posting.
> One of the functions that I find most useful in Pig is the map side
> join; Pig will put a file in the distributed cache, load it into
> memory, and do a join from the mappers. I'd like to add this to
> Crunch, but wasn't sure what the best way to do this would be. Do any
> of you guys have any thoughts on this?

I have a few, but they're not quite baked yet. We should have some
other folks weigh in.

First thought: Do a literal "map join", something like,

static PTable<K, Pair<V, V2>> mapJoin(PTable<K, V> pt, final Map<K, V2>,
m) {
  PTypeFamily ptf = pt.getTypeFamily();
  PType<V2> v2p = ptf.records(Iterables.getFirst(m.values()).getClass());
  return pt.parallelDo(new DoFn<Pair<K, V>, Pair<K, Pair<V, V2>() {
    void process(Pair<K, V> input, Emitter<Pair<K, Pair<V, V2>>> emitFn)
       V2 v2 = m.get(input.first());
       emitFn.emit(Pair.of(input.first(), Pair.of(input.second(), v2)));
  }, ptf.tableOf(pt.getKeyType(), ptf.pairs(ptf.getValueType(), v2p)));

which works fine if the map is small and serializable. (Although I
think I'm playing fast and loose w/the code there, that DoFn should
probably be a separate class, etc.

If the Map contains non-serializable data, then we would need a
mechanism for serializing it to disk, which gets kind of interesting.
We could well end up with a common function on the Pipeline class that
would convert Collection and Map/Multimap types in Java to
PCollections and PTables, where the MR impl of Crunch serializes the
data to disk using info from the PType and the in-memory version
doesn't do anything at all. Then we have a different implementation of
mapJoin which would get a reference to the instantiated data on HDFS
via a materialize() call, and would then construct a new DoFn that
took that reference and the PType for the collection and used it to
deserialize the data inside of the initialize() call when the code was
running on HDFS into a Map and then do the join.

That second option has some other nice benefits, like if we wanted to
make it easy to take a small sample of data from a local machine and
send it to HDFS with the equivalent of an NLineInputFormat so that we
could start up a map task per record and run, e.g., simulations.

I'm sure there are details I'm missing and stuff I'm glossing over
when it would come to the actual implementation, but I agree that it
would be super-useful.

> -- Joe

Director of Data Science
Twitter: @josh_wills

View raw message