Hi Connor,

Spark creates cached thread pool in Executor for executing the tasks:

// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")

and if we see org.apache.spark.util.ThreadUtils class, then we can see that number of threads for cached thread pool is not limited, so it can grow until INTEGER.MAX_VALUE

 /**
   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
   * unique, sequentially assigned integer.
   */
  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)
    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
  }

And in java.util.concurrent.Executors class :

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

So if there are lots of tasks to be launched at the same time, then the new thread can potentially grow until INTEGER.MAX_VALUE. But in reality as soon as tasks are finished, their threads will be returned back to cached pool, and will be reused for new tasks, instead of creating new thread for each task.

Now let's see why OutOfMemory occurs due to lots of new threads created.

OutofMemory error occurs usually when your executor/driver process does not have enough memory for allocating new native threads for executing tasks.

With the help of this command you can see how many threads gets created while executing your spark job:

ps -u <loginName> -L | wc -l


(in my case basic KMEANS ML algorithm Spark job creates 400+ threads)

with this command you can see thread limit set for your machine/OS which you can also increase:

ulimit -u

or more detailed:

ulimit -a


Here is the logic how memory gets used by each new created thread in executor: 
"The number of threads that can run within a JVM process is generally limited by the address space for that process. Each thread requires a thread stack. The more threads you have, the more process address space you use. The more address space you use for thread stacks, the less you have for the Java heap."

You can tune thread stack size :
-Xss determines the size of the stack: –Xss1024k. If the stack space is too small, eventually you will see an exception class java.lang.StackOverflowError.

--
Elkhan

On Sat, Aug 1, 2015 at 2:38 PM, Connor Zanin <cnnrznn@udel.edu> wrote:
1. I believe that the default memory (per executor) is 512m (from the documentation)
2. I have increased the memory used by spark on workers in my launch script when submitting the job
       (--executor-memory 124g)
3. The job completes successfully, it is the "road bumps" in the middle I am concerned with

I would like insight into how Spark handle thread creation

On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman <fabszn@gmail.com> wrote:
Hello,

I am not an expert with Spark, but the error thrown by spark seems indicate that not enough memory for launching job. By default, Spark allocated 1GB for memory, may be you should increase it ? 

Best regards

Fabrice

Le sam. 1 août 2015 à 22:51, Connor Zanin <cnnrznn@udel.edu> a écrit :
Hello,

I am having an issue when I run a word count job. I have included the source and log files for reference. The job finishes successfully, but about halfway through I get a java.lang.OutOfMemoryError (could not create native thread), and this leads to the loss of the Executor. After some searching I found out this was a problem with the environment and the limit by the OS on how many threads I could spawn.

However, I had thought that Spark only maintained a thread pool equal in size to the number of cores available across the nodes (by default), and schedules tasks dynamically as threads become available. The only Spark parameter I change is the number of partitions in my RDD.

My question is, how is Spark deciding how many threads to spawn and when?

--
Regards,

Connor Zanin
Computer Science
University of Delaware



--
Regards,

Connor Zanin
Computer Science
University of Delaware

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



--
Regards,

Connor Zanin
Computer Science
University of Delaware



--

Best regards,
Elkhan Dadashov