spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elkhan Dadashov <elkhan8...@gmail.com>
Subject Re: How does the # of tasks affect # of threads?
Date Tue, 04 Aug 2015 17:47:58 GMT
Hi Connor,

Spark creates cached thread pool in Executor
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala>
for executing the tasks:

// Start worker thread pool
*private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor
task launch worker")*

and if we see org.apache.spark.util.ThreadUtils class, then we can see that
number of threads for cached thread pool is not limited, so it can grow
until *INTEGER.MAX_VALUE*

 /**
   * Wrapper over newCachedThreadPool. Thread names are formatted as
prefix-ID, where ID is a
   * unique, sequentially assigned integer.
   */
  def *newDaemonCachedThreadPool*(prefix: String): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)

Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
  }

And in java.util.concurrent.Executors class :

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService *newCachedThreadPool*(ThreadFactory
threadFactory) {
        return new ThreadPoolExecutor(0, *Integer.MAX_VALUE*,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

So if there are lots of tasks to be launched at the same time, then the new
thread can potentially grow until INTEGER.MAX_VALUE. But in reality as soon
as tasks are finished, their threads will be returned back to cached pool,
and will be reused for new tasks, instead of creating new thread for each
task.

Now let's see why OutOfMemory occurs due to lots of new threads created.

OutofMemory error occurs usually when your executor/driver process does not
have enough memory for allocating new native threads for executing tasks.

With the help of this command you can see how many threads gets created
while executing your spark job:

*ps -u <loginName> -L | wc -l *

(in my case basic KMEANS ML algorithm Spark job creates 400+ threads)

with this command you can see thread limit set for your machine/OS which
you can also increase:

*ulimit -u*

or more detailed:

*ulimit -a*


Here is
<http://www.coderanch.com/t/376272/java/java/JVM-maximum-number-Threads>
the logic how memory gets used by each new created thread in executor:
"The number of threads that can run within a JVM process is generally
limited by the address space for that process. Each thread requires a
thread stack. The more threads you have, the more process address space you
use. The more address space you use for thread stacks, the less you have
for the Java heap."

You can tune thread stack size
<http://crunchify.com/jvm-tuning-heapsize-stacksize-garbage-collection-fundamental/>
:
-Xss determines the size of the stack: –Xss1024k. If the stack space is too
small, eventually you will see an exception class
java.lang.StackOverflowError.

--
Elkhan

On Sat, Aug 1, 2015 at 2:38 PM, Connor Zanin <cnnrznn@udel.edu> wrote:

> 1. I believe that the default memory (per executor) is 512m (from the
> documentation)
> 2. I have increased the memory used by spark on workers in my launch
> script when submitting the job
>        (--executor-memory 124g)
> 3. The job completes successfully, it is the "road bumps" in the middle I
> am concerned with
>
> I would like insight into how Spark handle thread creation
>
> On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman <fabszn@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am not an expert with Spark, but the error thrown by spark seems
>> indicate that not enough memory for launching job. By default, Spark
>> allocated 1GB for memory, may be you should increase it ?
>>
>> Best regards
>>
>> Fabrice
>>
>> Le sam. 1 août 2015 à 22:51, Connor Zanin <cnnrznn@udel.edu> a écrit :
>>
>>> Hello,
>>>
>>> I am having an issue when I run a word count job. I have included the
>>> source and log files for reference. The job finishes successfully, but
>>> about halfway through I get a java.lang.OutOfMemoryError (could not create
>>> native thread), and this leads to the loss of the Executor. After some
>>> searching I found out this was a problem with the environment and the limit
>>> by the OS on how many threads I could spawn.
>>>
>>> However, I had thought that Spark only maintained a thread pool equal in
>>> size to the number of cores available across the nodes (by default), and
>>> schedules tasks dynamically as threads become available. The only Spark
>>> parameter I change is the number of partitions in my RDD.
>>>
>>> My question is, how is Spark deciding how many threads to spawn and when?
>>>
>>> --
>>> Regards,
>>>
>>> Connor Zanin
>>> Computer Science
>>> University of Delaware
>>>
>>>
>>>
>>> --
>>> Regards,
>>>
>>> Connor Zanin
>>> Computer Science
>>> University of Delaware
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
>
> --
> Regards,
>
> Connor Zanin
> Computer Science
> University of Delaware
>



-- 

Best regards,
Elkhan Dadashov

Mime
View raw message