spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ian O'Connell" <>
Subject Re: sparkSQL thread safe?
Date Sun, 13 Jul 2014 02:59:27 GMT
Thanks for the response Michael

On the first i'm following the JIRA now thanks, not blocker for me but
would be great to see.

I opened up a PR with the resource pool usage around it. I didn't include
it in the PR, but a few classes we should probably add as registered in
kryo for good perf/size:

Thanks for adding that distinct btw, great to have it scale more.

On the last, opened the JIRA thanks.

Also more of a sparkCore thing that you might already be aware of, but I
haven't seen mentioned somewhere and was hitting me(Also if any part of
this seems wrong to you I'd love to know):

I was getting out of memory doing a bunch of ops against medium(~1TB
compressed) input sizes with simple things that should spill nicely
(distinct, reduceByKey(_ + _) ).

Anyway what I came back with(copied from an internal email):

I looked through some heap dumps from the OOM's in spark and found there
were >10k instances of DiskBlockObjectWriter's each of which were up to
300kb in size per active executor. At up to 12 concurrent tasks per host is
about 33gb of space topping out. The nodes of course were failing before
this(max mem on our ts cluster per jvm is 25gb).

The memory usage primarily comes from two places, a byte array in
LZFOutputStream and a byte array in BufferedOutputStream. These are both
output buffers along the way to disk(so when we are using the former we can
turn down/disable the latter). These are configured to be 65kb and 100kb
respectively by default. The former is not a configurable option but is
static in that library's code.

These come from the ShuffleBlockWriter, that is we get an input stream with
>10k chunks. When we do operations which require partitioning (say
distinct, reduceByKey, etc..) it maintains the existing partition count. So
each task basically opens >10k files, each file handle of which has these
buffers in place for that task to write to.

Solution put in place(maybe there's a better one?):

X: The heap size for an executors JVM
Y: The number of threads/cores allowed for concurrent execution per host
Z: The expected overhead of these output streams (currently estimated at
65k + size of the output buffer * 1.1 for overheads)
K: The fraction of memory to allow be used for this overhead (configurable
parameter, default @ 0.2)

Then, the number of partitions: P = (X / Y / Z) * K

Then inside some of our root sources now:
-> After assembling the RDD, if numPartitions > P
-> coalesce to P.
This won't trigger another shuffle phase, so can easily sit inline to
source definitions.

The only real down side of this approach i've seen is that it limits the
number of tasks in this initial map phase which may not be ideal for
parallelism when loading a large dataset and then filtering heavily. It
would be more efficient to pass P into the first distinct/reduceByKey call,
but the user code would have to reference P.

On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust <>

> Hey Ian,
> Thanks for bringing these up!  Responses in-line:
> Just wondering if right now spark sql is expected to be thread safe on
> > master?
> > doing a simple hadoop file -> RDD -> schema RDD -> write parquet
> > will fail in reflection code if i run these in a thread pool.
> >
> You are probably hitting SPARK-2178
> <> which is caused by
> SI-6240 <>.  We have a plan to
> fix this by moving the schema introspection to compile time, using macros.
> > The SparkSqlSerializer, seems to create a new Kryo instance each time it
> > wants to serialize anything. I got a huge speedup when I had any
> > non-primitive type in my SchemaRDD using the ResourcePool's from Chill
> for
> > providing the KryoSerializer to it. (I can open an RB if there is some
> > reason not to re-use them?)
> >
> Sounds like SPARK-2102 <>.
>  There is no reason AFAIK to not reuse the instance. A PR would be greatly
> appreciated!
> > With the Distinct Count operator there is no map-side operations, and a
> > test to check for this. Is there any reason not to do a map side combine
> > into a set and then merge the sets later? (similar to the approximate
> > distinct count operator)
> >
> Thats just not an optimization that we had implemented yet... but I've just
> done it here <> and it'll be in
> master soon :)
> > Another thing while i'm mailing.. the 1.0.1 docs have a section like:
> > "
> > // Note: Case classes in Scala 2.10 can support only up to 22 fields. To
> > work around this limit, // you can use custom classes that implement the
> > Product interface.
> > "
> >
> > Which sounds great, we have lots of data in thrift.. so via scrooge (
> >, we end up with ultimately instances
> > of
> > traits which implement product. Though the reflection code appears to
> look
> > for the constructor of the class and base the types based on those
> > parameters?
> Yeah, thats true that we only look in the constructor at the moment, but I
> don't think there is a really good reason for that (other than I guess we
> will need to add code to make sure we skip builtin object methods).  If you
> want to open a JIRA, we can try fixing this.
> Michael

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message