Hey Josh,

@globs: Good.

Rest is inlined.

Am Montag, 18. Juni 2012 06:59:47 UTC+2 schrieb Josh Wills:
Hey Stefan,

Reply inlined.

On Sat, Jun 16, 2012 at 6:03 AM,  <acki.ch@gmail.com> wrote:
> Hey Josh,
>
> @TupleWritables: Yes I saw that. Didn't think too much about the performance
> implications though. Writing all the classinfo is unnecessary because it is
> statically known, and an identifier will help with that. But using
> reflection will use much more CPU time than if you can avoid it, as proven
> by my little experiment, and it will not help with that.
>
> Kryo uses a neat trick for cutting down serialization size. By forcing the
> user to register all classes in well defined sequence, it can use the index
> for a class in that sequence to describe it. Not as good as describing a
> whole schema, but generally applicable for serialization.
>
> Using avro internally sounds like a good idea. I don't know it too well
> though. So you would give up the TupleWritable's completely, replace them
> with AvroWritables, and use the mentioned commit to use Writables within
> Avro? What do you mean with "I think there would be some cool stuff we could
> do if we could assume how all shuffle serializations worked, ..."?
> Well the reflection call would stay, but that can be solved independently I
> guess (for example by having an optional factory argument in
> BytesToWritableMapFn).

Re: cool stuff, I suspect that having Avro for all of the intermediate
data transfer would dramatically simplify the implementation of MSCR
fusion, which I've been putting off b/c having an abstraction that
would handle fusion for both Writables and Avro makes my head hurt.
I'm going to start a thread on crunch-dev@incubator.apache.org
advocating for it.

Good. I will be following the discussion a bit. If we do the experiments again, it would be mid july. Not to put pressure on you, if you don't have anything by then we will probably just do some hack to be able to estimate the performance.
>
> What still bothers me is that the partitioner for the join, which only has
> to ensure that the bit indicating whether it's left or right is ignored,
> still has to do much too much work. If he could just get the serialized
> bytes for the key, he could compute the hashcode for the byte array
> directly, just ignoring the last byte. The deserialization there is
> unnecessary and is actually, at least in my profiling, what seemed to hurt
> the performance of the join so badly. Maybe some more benchmarking is needed
> though for this.

That doesn't sound right-- isn't the Partitioner call is done at the
end of the map task, before the data is ever serialized?

You might be right about that. I just checked the thread dump again I was referring to:
 (Rest omitted)
    at com.cloudera.crunch.type.writable.TupleWritable.readFields(TupleWritable.java:163)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:968)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:95)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:125)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:125)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:125)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:125)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:125)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1254)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1155)
    - locked <0x000000070606dac8> (a org.apache.hadoop.mapred.MapTask$MapOutputBuffer)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:582)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
So it's not the partitioner, it's the sorting that forces deserialization.
You don't set the comparator for the join (as I mentioned in the first or second mail). You could set it to your own one, which is optimized by knowing about how the comparison works in [1]. It is not a general solution, but might already help a bit. In case of Strings, a byte by byte comparison would be enough here.

Cheers,
Stefan

[1] https://github.com/cloudera/crunch/blob/master/src/main/java/com/cloudera/crunch/types/writable/TupleWritable.java#L200
>
> Guess I should contact Alexy Khravbrov then.
>
> Cheers,
> Stefan
>
> PS: Sorry, replied directly instead to the mailing list.
>
> Am Freitag, 15. Juni 2012 17:37:06 UTC+2 schrieb Josh Wills:
>>
>> On Fri, Jun 15, 2012 at 2:24 AM,  <acki.ch@gmail.com> wrote:
>> > Hey Josh,
>> >
>> > I actually first found pangool and from there concluded that Crunch is
>> > worth
>> > a try. So you are saying that the TupleWritable's are in general quite
>> > slow,
>> > and the performance of Crunch is in that case not comparable to pure
>> > Hadoop?
>> > If it really makes a difference we should look into generating
>> > TupleWritable
>> > subclasses for our project (if there will be another round of
>> > benchmarks).
>>
>> The impl of TupleWritable in Crunch is too conservative, in that it
>> passes along the names of the Writable classes that it serialized in
>> the tuple along with the actual data, which leads to a pretty massive
>> blowup in the amount of data that gets passed around. My initial
>> thought in doing this was that I wanted to be sure that the Crunch
>> output was always readable by anything-- e.g., you didn't have to use
>> Crunch in order to read the data back, since all of the information on
>> what the data contained was there. This is essentially what Avro gets
>> you, although the Avro file format is smarter about just putting the
>> schema at the head of the file and not copying it over and over again,
>> which is why we generally recommend Avro + Crunch. The #s in the
>> pangool benchmark were all based on Avro.
>>
>> The tuple-oriented frameworks handle this by having integer
>> identifiers for the different data types are supported, and that's
>> certainly one way to improve the performance here. I've also been
>> tossing around the idea of using Avro for everything internal to the
>> framework (e.g., any data transferred during the shuffle stage), since
>> I just added a way for AvroTypes to support arbitrary writables:
>>
>>
>> https://github.com/cloudera/crunch/commit/224102ac4813fc0e124114026438a2e3884f858b
>>
>> I think there would be some cool stuff we could do if we could assume
>> how all shuffle serializations worked, but I haven't benchmarked it
>> yet or tossed the idea around with the other committers.
>>
>> >
>> > You didn't comment on the the SourceTargetHelper. Do glob patterns now
>> > work
>> > fine as inputs? It just failed in 0.2.4 when I specified lineitems* as
>> > input
>> > in local mode (no hdfs), while it worked fine in Scoobi.
>>
>> Sorry I missed that-- I was under the impression it did work, but I'll
>> take a look at it and report back.
>>
>> >
>> > Right now I just need to finish my thesis, so currently I am not
>> > planning to
>> > commit anywhere.
>>
>> It never hurts to ask. I saw your threads on the spark and scoobi
>> mailing lists and enjoyed them-- please keep us posted on what you're
>> up to. Alexy Khravbrov at Klout was also interested in an abstraction
>> layer for Scala MapReduce vs. Spark-- did you talk to him?
>>
>> >
>> > Cheers,
>> > Stefan
>> >
>> > Am Donnerstag, 14. Juni 2012 16:42:43 UTC+2 schrieb Josh Wills:
>> >>
>> >> Hey Stefan,
>> >>
>> >> Thanks for your email. Re: join performance, I agree with you that the
>> >> current implementation that uses Writables is pretty terrible, and
>> >> I've been thinking about good ways to do away with it for awhile now.
>> >> The good news is that the Avro-based implementation of PTypes is
>> >> pretty fast, and that's what most people end up using in practice. For
>> >> example, when the Pangool guys were benchmarking frameworks, they used
>> >> the Avro PTypes, and it usually runs pretty close to native MR:
>> >> http://pangool.net/benchmark.html
>> >>
>> >> Re: the gist, I will take a look at it. The good news about the pull
>> >> request is that we just submitted Crunch to the Apache Incubator and
>> >> are in the process of moving over all of the infrastructure. It would
>> >> be great to have your patches in when we're done-- we're always on the
>> >> lookout for people who are interested in becoming committers.
>> >>
>> >> Best,
>> >> Josh
>> >>
>> >> On Thu, Jun 14, 2012 at 6:53 AM,  <acki.ch@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > I did some more investigations. You set the partitioner correctly,
>> >> > but
>> >> > you
>> >> > do not set the comparator. But actually the comparator might not be
>> >> > needed,
>> >> > because the value with 0 will always come first by the default
>> >> > comparator?
>> >> > If you rely on that, then maybe you should put a comment there as it
>> >> > is
>> >> > not
>> >> > immediately obvious.
>> >> >
>> >> > The performance problem with joins (as shown by profiling) is that
>> >> > Hadoop
>> >> > does not know about the PType's. So to deserialize during sorting the
>> >> > general TupleWritable is used, which is very inefficient as it uses
>> >> > reflection. I added a cache for the Class.forName call  in readFields
>> >> > and it
>> >> > improves performance a lot, but it is not a definitive answer to the
>> >> > problem. Maybe you could add a special writable for TaggedKey's,
>> >> > which
>> >> > knows
>> >> > that one of them is an Integer. To define the partitioning, the
>> >> > actual
>> >> > content does not matter, just that the tagging value is ignored.
>> >> > Scoobi
>> >> > even
>> >> > generates classes for TaggedKeys and TaggedValues, that would most
>> >> > likely be
>> >> > even faster.
>> >> >
>> >> > Here are some numbers for a 1GB join on my laptop, second column has
>> >> > the
>> >> > total amount of seconds.
>> >> > My version of a join, which tags the values by adding a boolean to
>> >> > them.
>> >> > Note that in most cases, where the size of one group does not come
>> >> > close
>> >> > to
>> >> > the available RAM, this is totally fine. Implemented in [1].
>> >> > crunch_4    33.42    13    2.06    26.82    6526368    86%
>> >> > Crunch's original join
>> >> > crunch_4    150.11    13    2.79    143.50    7924496    97%
>> >> > Crunch's join with my caching changes in TupleWritable:
>> >> > crunch_4    69.67    13    2.51    59.99    7965808    89%
>> >> >
>> >> > Too lazy to open a pull request, here is a gist of my changes for the
>> >> > join.
>> >> > https://gist.github.com/2930414
>> >> > These are for 0.2.4, I hope it's still applicable for trunk.
>> >> >
>> >> > Regards,
>> >> > Stefan Ackermann
>> >> >
>> >> > [1]
>> >> >
>> >> >
>> >> > https://github.com/Stivo/Distributed/blob/master/crunch/src/main/scala/ch/epfl/distributed/utils/CrunchUtils.scala#L83
>> >> >
>> >> > Am Donnerstag, 14. Juni 2012 12:24:34 UTC+2 schrieb ack...@gmail.com:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I am writing a Distributed Collections DSL with compiler
>> >> >> optimizations
>> >> >> as
>> >> >> my master thesis. I have added Crunch as a backend, since we were
>> >> >> not
>> >> >> happy
>> >> >> with the performance we were getting from our other backend for
>> >> >> Hadoop.
>> >> >> And
>> >> >> I must say, I am quite impressed by Crunch's performance.
>> >> >> Here is some sample code our DSL generates, in case you are
>> >> >> interested:
>> >> >>
>> >> >>
>> >> >>
>> >> >> https://github.com/Stivo/Distributed/blob/bigdata2012/crunch/src/main/scala/generated/v4/
>> >> >>
>> >> >> We are impressed with the performance, except for the joins. In
>> >> >> local
>> >> >> benchmarks, joins performed 100x worse than my own join
>> >> >> implementation
>> >> >> (no
>> >> >> joke, 100x). Also it seems like the current implementation has some
>> >> >> bugs.
>> >> >> You are setting the partitioner class to the comparator in
>> >> >>
>> >> >>
>> >> >>
>> >> >> https://github.com/cloudera/crunch/blob/master/src/main/java/com/cloudera/crunch/lib/Join.java#L144
>> >> >> . Also you are not setting the partitioner class. Seems to me the
>> >> >> code
>> >> >> is
>> >> >> all there, it's just not linked up correctly.
>> >> >> For the performance, maybe you should have different versions of the
>> >> >> comparator / partitioner for different key types instead of writing
>> >> >> the
>> >> >> key
>> >> >> just to compare it. String comparison for example only checks
>> >> >> characters
>> >> >> until it finds a difference.
>> >> >>
>> >> >> I encountered lots of problems with SourceTargetHelper in 0.2.4. I
>> >> >> know
>> >> >> you changed it since then, but I think I also had troubles with the
>> >> >> new
>> >> >> version. Does it support using glob patterns or directories as
>> >> >> input?
>> >> >> In any
>> >> >> case, it should not prevent the program from running imho. I spent
>> >> >> quite a
>> >> >> while just trying to work around that bug.
>> >> >>
>> >> >> Regards,
>> >> >> Stefan Ackermann
>> >> >
>> >> >
>> >>
>> >>
>> >> --
>> >> Director of Data Science
>> >> Cloudera
>> >> Twitter: @josh_wills
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> Twitter: @josh_wills



--
Director of Data Science
Cloudera
Twitter: @josh_wills