spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Richard Marscher <rmarsc...@localytics.com>
Subject Re: How to increase parallelism of a Spark cluster?
Date Tue, 04 Aug 2015 16:06:33 GMT
I think you did a good job of summarizing terminology and describing
spark's operation. However #7 is inaccurate if I am interpreting correctly.
The scheduler schedules X tasks from the current stage across all
executors, where X is the the number of cores assigned to the application
(assuming only this stage is running). `resourceOfferSingleTaskSet` in
TaskSchedulerImpl gives an idea of how it's launching tasks from a stage's
task set based on the current available cores across all executors:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L220.
This is what I have observed in all of our Spark Standalone clusters. In
fact I just ran a job against my laptop "cluster" of 1 executor with 8
partitions in a stage. I have my `spark.cores.max` set to 4 and it ran 4
tasks concurrently, running new tasks after a previous one finished.

Also, #8 is only true if you setup fair scheduling at the sub-job level
(there are two kinds of fair scheduling that I've seen, intra-job and
sub-job. `spark.scheduler.mode` will set intra-job but you need to provide
a configuration file to `spark.scheduler.pool` to get sub-job fair
scheduling).


On Mon, Aug 3, 2015 at 4:16 PM, Ajay Singal <asingal11@gmail.com> wrote:

> Hi Sujit,
>
>
>
> From experimenting with Spark (and other documentation), my understanding
> is as follows:
>
> 1.       Each application consists of one or more Jobs
>
> 2.       Each Job has one or more Stages
>
> 3.       Each Stage creates one or more Tasks (normally, one Task per
> Partition)
>
> 4.       Master allocates one Executor per Worker (that contains
> Partition) per Application
>
> 5.       The Executor stays up for the lifetime of the Application (and
> dies when the Application ends)
>
> 6.       Each Executor can run multiple Tasks in parallel (normally, the
> parallelism depends on the number of cores per Executor).
>
> 7.       The Scheduler schedules only one Task from each Stage to one
> Executor.
>
> 8.       If there are multiple Stages (from a Job) and these Stages could
> be run asynchronously (i.e., in parallel), one Task from each Stage could
> be scheduled on the same Executor (thus this Executor runs multiple Tasks
> in parallel: see #6 above).
>
>
>
> Of course, there could be many exception/exclusions to what I explained
> above.  I expect that Spark community will confirm or correct my
> observations/understanding above.
>
>
>
> Now, let’s come back to your situation.  You have a cluster of 4 Workers
> with 10 Partitions.  All of these 10 Partitions are distributed among these
> 4 Workers.  Also, from the information provided by you, your Application
> has just one Job with a two Stages (repartition and mapPartition).  The
> mapPartition Stage will have 10 Tasks.  Assuming my
> observations/understanding is correct, by virtue of #7 above, only 4 Tasks
> can be executed in parallel.  The subsequent Jobs will have to wait.
>
>
>
> However, if you had 10 or more Workers, all Tasks would have been executed
> in parallel.  BTW, I believe, you can have multiple Workers on one Physical
> Node.  So, one of the solution to your problem would be to increase the
> number of Workers.
>
>
>
> Having said so, I believe #7 above is the bottleneck.  If there is no good
> reason for keeping this bottleneck, this could be a good area of
> improvement (and needs to be addressed by Spark community).  I will wait
> for the community response, and if needed, I will open a JIRA item.
>
>
>
> I hope it helps.
>
>
>
> Regards,
>
> Ajay
>
> On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal <sujitatgtalk@gmail.com> wrote:
>
>> @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
>> query string in the partition, sends the query to Solr using SolrJ, and
>> gets back the top N results. It then reformats the result data into one
>> long string and returns the key value pair as (query string, result string).
>>
>> @Igor: Thanks for the parameter suggestions. I will check the
>> --num-executors and if there is a way to set the number of cores/executor
>> with my Databricks admin and update here if I find it, but from the
>> Databricks console, it appears that the number of executors per box is 1.
>> This seems normal though, per the diagram on this page:
>>
>> http://spark.apache.org/docs/latest/cluster-overview.html
>>
>> where it seems that there is 1 executor per box, and each executor can
>> spawn multiple threads to take care of multiple tasks (see bullet #1 copied
>> below).
>>
>>> Each application gets its own executor processes, which stay up for the
>>> duration of the whole application and run tasks in multiple threads. This
>>> has the benefit of isolating applications from each other, on both the
>>> scheduling side (each driver schedules its own tasks) and executor side
>>> (tasks from different applications run in different JVMs).
>>
>>
>> Regarding hitting the max number of requests, thanks for the link. I am
>> using the default client. Just peeked at the Solr code, and the default
>> settings (if no HttpClient instance is supplied in the ctor) is to use
>> DefaultHttpClient (from HttpComponents) whose settings are as follows:
>>
>>>
>>>    - Version: HttpVersion.HTTP_1_1
>>>
>>>
>>>    - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET
>>>
>>>
>>>    - NoTcpDelay: true
>>>
>>>
>>>    - SocketBufferSize: 8192
>>>
>>>
>>>    - UserAgent: Apache-HttpClient/release (java 1.5)
>>>
>>> In addition, the Solr code sets the following additional config
>> parameters on the DefaultHttpClient.
>>
>>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
>>>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
>>>       params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
>>
>> Since all my connections are coming out of 2 worker boxes, it looks like
>> I could get 32x2 = 64 clients hitting Solr, right?
>>
>> @Steve: Thanks for the link to the HttpClient config. I was thinking
>> about using a thread pool (or better using a PoolingHttpClientManager per
>> the docs), but it probably won't help since its still being fed one request
>> at a time.
>> @Abhishek: my observations agree with what you said. In the past I have
>> had success with repartition to reduce the partition size especially when
>> groupBy operations were involved. But I believe an executor should be able
>> to handle multiple tasks in parallel from what I understand about Akka on
>> which Spark is built - the worker is essentially an ActorSystem which can
>> contain multiple Actors, each actor works on a queue of tasks. Within an
>> Actor everything is sequential, but the ActorSystem is responsible for
>> farming out tasks it gets to each of its Actors. Although it is possible I
>> could be generalizing incorrectly from my limited experience with Akka.
>>
>> Thanks again for all your help. Please let me know if something jumps out
>> and/or if there is some configuration I should check.
>>
>> -sujit
>>
>>
>>
>> On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh <
>> abhishsi@tetrationanalytics.com> wrote:
>>
>>> I don't know if (your assertion/expectation that) workers will process
>>> things (multiple partitions) in parallel is really valid. Or if having more
>>> partitions than workers will necessarily help (unless you are memory bound
>>> - so partitions is essentially helping your work size rather than execution
>>> parallelism).
>>>
>>> [Disclaimer: I am no authority on Spark, but wanted to throw my spin
>>> based my own understanding].
>>>
>>> Nothing official about it :)
>>>
>>> -abhishek-
>>>
>>> On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgtalk@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to run a Spark job that hits an external webservice to get
>>> back some information. The cluster is 1 master + 4 workers, each worker has
>>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>>> and is accessed using code similar to that shown below.
>>>
>>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>>         Iterator[(String, String)] = {
>>>>     val solr = new HttpSolrClient()
>>>>     initializeSolrParameters(solr)
>>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>>> }
>>>> myRDD.repartition(10)
>>>
>>>              .mapPartitions(keyValues => getResults(keyValues))
>>>>
>>>
>>> The mapPartitions does some initialization to the SolrJ client per
>>> partition and then hits it for each record in the partition via the
>>> getResults() call.
>>>
>>> I repartitioned in the hope that this will result in 10 clients hitting
>>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>>> clients if I can). However, I counted the number of open connections using
>>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>>> observed that Solr has a constant 4 clients (ie, equal to the number of
>>> workers) over the lifetime of the run.
>>>
>>> My observation leads me to believe that each worker processes a single
>>> stream of work sequentially. However, from what I understand about how
>>> Spark works, each worker should be able to process number of tasks
>>> parallelly, and that repartition() is a hint for it to do so.
>>>
>>> Is there some SparkConf environment variable I should set to increase
>>> parallelism in these workers, or should I just configure a cluster with
>>> multiple workers per machine? Or is there something I am doing wrong?
>>>
>>> Thank you in advance for any pointers you can provide.
>>>
>>> -sujit
>>>
>>>
>>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>

Mime
View raw message