spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: sparkSQL thread safe?
Date Sun, 13 Jul 2014 06:19:30 GMT
Ian,

The LZFOutputStream's large byte buffer is sort of annoying. It is much
smaller if you use the Snappy one. The downside of the Snappy one is
slightly less compression (I've seen 10 - 20% larger sizes).

If we can find a compression scheme implementation that doesn't do very
large buffers, that'd be a good idea too ... let me know if you have any
suggestions.

In the future, we plan to make shuffle write to less number of streams at
the same time.



On Sat, Jul 12, 2014 at 7:59 PM, Ian O'Connell <ian@ianoconnell.com> wrote:

> Thanks for the response Michael
>
> On the first i'm following the JIRA now thanks, not blocker for me but
> would be great to see.
>
> I opened up a PR with the resource pool usage around it. I didn't include
> it in the PR, but a few classes we should probably add as registered in
> kryo for good perf/size:
>     classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
>     classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
>     classOf[org.apache.spark.sql.catalyst.expressions.Row],
>     classOf[Array[Object]],
>     scala.collection.immutable.Nil.getClass,
>     scala.collection.immutable.::.getClass,
>     classOf[scala.collection.immutable.::[Any]]
>
> Thanks for adding that distinct btw, great to have it scale more.
>
> On the last, opened the JIRA thanks.
>
> Also more of a sparkCore thing that you might already be aware of, but I
> haven't seen mentioned somewhere and was hitting me(Also if any part of
> this seems wrong to you I'd love to know):
>
> I was getting out of memory doing a bunch of ops against medium(~1TB
> compressed) input sizes with simple things that should spill nicely
> (distinct, reduceByKey(_ + _) ).
>
> Anyway what I came back with(copied from an internal email):
>
> I looked through some heap dumps from the OOM's in spark and found there
> were >10k instances of DiskBlockObjectWriter's each of which were up to
> 300kb in size per active executor. At up to 12 concurrent tasks per host is
> about 33gb of space topping out. The nodes of course were failing before
> this(max mem on our ts cluster per jvm is 25gb).
>
> The memory usage primarily comes from two places, a byte array in
> LZFOutputStream and a byte array in BufferedOutputStream. These are both
> output buffers along the way to disk(so when we are using the former we can
> turn down/disable the latter). These are configured to be 65kb and 100kb
> respectively by default. The former is not a configurable option but is
> static in that library's code.
>
> These come from the ShuffleBlockWriter, that is we get an input stream with
> >10k chunks. When we do operations which require partitioning (say
> distinct, reduceByKey, etc..) it maintains the existing partition count. So
> each task basically opens >10k files, each file handle of which has these
> buffers in place for that task to write to.
>
> Solution put in place(maybe there's a better one?):
>
> Given:
> X: The heap size for an executors JVM
> Y: The number of threads/cores allowed for concurrent execution per host
> Z: The expected overhead of these output streams (currently estimated at
> 65k + size of the output buffer * 1.1 for overheads)
> K: The fraction of memory to allow be used for this overhead (configurable
> parameter, default @ 0.2)
>
> Then, the number of partitions: P = (X / Y / Z) * K
>
> Then inside some of our root sources now:
> -> After assembling the RDD, if numPartitions > P
> -> coalesce to P.
> This won't trigger another shuffle phase, so can easily sit inline to
> source definitions.
>
> The only real down side of this approach i've seen is that it limits the
> number of tasks in this initial map phase which may not be ideal for
> parallelism when loading a large dataset and then filtering heavily. It
> would be more efficient to pass P into the first distinct/reduceByKey call,
> but the user code would have to reference P.
>
>
>
>
>
> On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust <michael@databricks.com>
> wrote:
>
> > Hey Ian,
> >
> > Thanks for bringing these up!  Responses in-line:
> >
> > Just wondering if right now spark sql is expected to be thread safe on
> > > master?
> > > doing a simple hadoop file -> RDD -> schema RDD -> write parquet
> > > will fail in reflection code if i run these in a thread pool.
> > >
> >
> > You are probably hitting SPARK-2178
> > <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
> > SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.  We have a plan
> to
> > fix this by moving the schema introspection to compile time, using
> macros.
> >
> >
> > > The SparkSqlSerializer, seems to create a new Kryo instance each time
> it
> > > wants to serialize anything. I got a huge speedup when I had any
> > > non-primitive type in my SchemaRDD using the ResourcePool's from Chill
> > for
> > > providing the KryoSerializer to it. (I can open an RB if there is some
> > > reason not to re-use them?)
> > >
> >
> > Sounds like SPARK-2102 <https://issues.apache.org/jira/browse/SPARK-2102
> >.
> >  There is no reason AFAIK to not reuse the instance. A PR would be
> greatly
> > appreciated!
> >
> >
> > > With the Distinct Count operator there is no map-side operations, and a
> > > test to check for this. Is there any reason not to do a map side
> combine
> > > into a set and then merge the sets later? (similar to the approximate
> > > distinct count operator)
> > >
> >
> > Thats just not an optimization that we had implemented yet... but I've
> just
> > done it here <https://github.com/apache/spark/pull/1366> and it'll be in
> > master soon :)
> >
> >
> > > Another thing while i'm mailing.. the 1.0.1 docs have a section like:
> > > "
> > > // Note: Case classes in Scala 2.10 can support only up to 22 fields.
> To
> > > work around this limit, // you can use custom classes that implement
> the
> > > Product interface.
> > > "
> > >
> > > Which sounds great, we have lots of data in thrift.. so via scrooge (
> > > https://github.com/twitter/scrooge), we end up with ultimately
> instances
> > > of
> > > traits which implement product. Though the reflection code appears to
> > look
> > > for the constructor of the class and base the types based on those
> > > parameters?
> >
> >
> > Yeah, thats true that we only look in the constructor at the moment, but
> I
> > don't think there is a really good reason for that (other than I guess we
> > will need to add code to make sure we skip builtin object methods).  If
> you
> > want to open a JIRA, we can try fixing this.
> >
> > Michael
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message