spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: How to achieve reasonable performance on Spark Streaming?
Date Thu, 19 Jun 2014 19:15:55 GMT
Hello all,

Apologies for the late response, this thread went below my radar. There are
a number of things that can be done to improve the performance. Here are
some of them of the top of my head based on what you have mentioned. Most
of them are mentioned in the streaming guide's performance tuning section
<http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch>
.

1. Good input distribution - If the data ingestion can be distributed
across multiple receivers, then it is worth doing so. Partitioning the data
streams and parallelizing the receiving distributed
serialization-deserialization cost of receiving. Note that you can create
multiple input streams (presumably of the same type) and do a union on them
(StreamingContext.union) to create a single DStream out of them.

2. Good task distribution - The tasks, specially the first map stage on the
input stream(s), should be well distributed across the workers in the
cluster. If they are not, then it is sometimes worth repartitioning the
dstream (DStream.repartition) so that computation can be parallelized well.
Can improve performance.

3. Enable Kryo serialization - Set spark conf property "spark.serializer"
[see configuration guide
<http://spark.apache.org/docs/latest/configuration.html>] This can improve
performance all across the board.

4. Enable Concurrent MarkSweep GC - This helps with more stable batch
processing times. You see the gc overheads in stages in the web ui. If they
are in the order of 100s milliseconds, then its worth tuning the GC.

5. Store strings as byte arrays, and do all string transformations as byte
array transformations (that is, implemented extractWordOnePairs on
bytearrays). This can give a very significant boost is throughput as the
java string library is not very efficient. Granted that this requires more
work in manipulating bytes yourself.


Regarding the 5 seconds pauses, the culprit could very well be the disk.
Since shuffle file are written to disk, that often gives rise to
unpredictable delays. In our research paper
<http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf> we
had got much higher performance using in-memory shuffling. However, the
current version of Spark does not have in-memory shuffling for a number of
reasons (prevent RDD cache pollution with single-use shuffle data, reduce
chances of OOMs, etc.). It is recommended that you use a fast file system
(SSD?, RAMFS?) at the workers and make sure the local working directory
uses that. If you are interested in hacking in-memory shuffling within
Spark, you can hack around the org.apache.spark.storage.* (especially,
BlockManager) to make sure all shuffle related "blocks" go to MemoryStore
(look out for code deals with shuffle blocks and passes them directly to
DiskBlockManager). We are in the process of building in-memory shuffle in
spark, but its hard to give a ETA on that as of now.

To achieve even higher throughput (specifically the ones reported in the
paper), it require pre-serializing the data into the Spark format (kryo
format, if kryo serialization is used) as large byte arrays, receiving the
byte arrays directly and storing them in the receiver directly (without
deserializing them). This avoids serialization-deserialization costs of
receiving the data and can give higher throughput overall.

Let me know if this helps, or there are more detailed performance tuning
you are interested in.

TD



On Fri, Jun 13, 2014 at 11:22 AM, Michael Chang <mike@tellapart.com> wrote:

> I'm interested in this issue as well.  I have spark streaming jobs that
> seems to run well for a while, but slowly degrade and don't recover.
>
>
> On Wed, Jun 11, 2014 at 11:08 PM, Boduo Li <onpoq.l@gmail.com> wrote:
>
>> It seems that the slow "reduce" tasks are caused by slow shuffling. Here
>> is
>> the logs regarding one slow "reduce" task:
>>
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_88_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_89_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_90_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_91_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_92_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_93_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_94_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_95_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_96_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_97_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_188_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_189_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_190_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_191_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_192_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_193_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_194_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_195_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_196_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_197_18 after  5029 ms
>> 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is
>> 1143
>> 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to
>> driver
>> 14/06/11 23:42:45 INFO Executor: Finished task ID 23643
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Mime
View raw message