This question looks very similar to mine but I don't see any answer.

http://markmail.org/message/kkxhi5jjtwyadzxt

On Mon, Oct 31, 2016 at 11:24 PM, kant kodali <kanth909@gmail.com> wrote:
Here is a UI of my thread dump.

http://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMTYvMTEvMS8tLWpzdGFja19kdW1wX3dpbmRvd19pbnRlcnZhbF8xbWluX2JhdGNoX2ludGVydmFsXzFzLnR4dC0tNi0xNy00Ng==

On Mon, Oct 31, 2016 at 7:10 PM, kant kodali <kanth909@gmail.com> wrote:
Hi Ryan,

I think you are right. This may not be related to the Receiver. I have attached jstack dump here. I do a simple MapToPair and reduceByKey and  I have a window Interval of 1 minute (60000ms) and batch interval of 1s (1000) This is generating lot of threads atleast 5 to 8 threads per second and the total number of threads is monotonically increasing. So just for tweaking purpose I changed my window interval to 1min (60000ms) and batch interval of 10s (10000) this looked lot better but still not ideal at very least it is not monotonic anymore (It goes up and down). Now my question  really is how do I tune such that my number of threads are optimal while satisfying the window Interval of 1 minute (60000ms) and batch interval of 1s (1000) ?

This jstack dump is taken after running my spark driver program for 2 mins and there are about 1000 threads.

Thanks!


On Mon, Oct 31, 2016 at 1:09 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
If there is some leaking threads, I think you should be able to see the number of threads is increasing. You can just dump threads after 1-2 hours.

On Mon, Oct 31, 2016 at 12:59 PM, kant kodali <kanth909@gmail.com> wrote:
yes I can certainly use jstack but it requires 4 to 5 hours for me to reproduce the error so I can get back as early as possible.

Thanks a lot!

On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
Then it should not be a Receiver issue. Could you use `jstack` to find out the name of leaking threads?

On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth909@gmail.com> wrote:
Hi Ryan,

It happens on the driver side and I am running on a client mode (not the cluster mode).

Thanks!

On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
Sorry, there is a typo in my previous email: this may **not** be the root cause if the leak threads are in the driver side.

Does it happen in the driver or executors?

On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth909@gmail.com> wrote:
Hi Ryan,
Ahh My Receiver.onStop method is currently empty. 
1) I have a hard time seeing why the receiver would crash so many times within a span of 4 to 5 hours but anyways I understand I should still cleanup during OnStop.
2) How do I clean up those threads? The documentation here https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem to have any method where I can clean up the threads created during OnStart. any ideas?
Thanks!

On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
So in your code, each Receiver will start a new thread. Did you stop the receiver properly in `Receiver.onStop`? Otherwise, you may leak threads after a receiver crashes and is restarted by Spark. However, this may be the root cause since the leak threads are in the driver side. Could you use `jstack` to check which types of threads are leaking? 

On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth909@gmail.com> wrote:
I am also under the assumption that onStart function of the Receiver is only called only once by Spark. please correct me if I am wrong.

On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth909@gmail.com> wrote:
My driver program runs a spark streaming job.  And it spawns a thread by itself only in the onStart() function below Other than that it doesn't spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD, Collect functions.
public class NSQReceiver extends Receiver<String> {

private String topic="";

public NSQReceiver(String topic) {
super(StorageLevel.MEMORY_AND_DISK_2());
this.topic = topic;
}

@Override
public void onStart() {
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
}

Environment info:
Java 8
Scala 2.11.8
Spark 2.0.0
More than happy to share any other info you may need.

On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <jakob@odersky.com> wrote:
 > how do I tell my spark driver program to not create so many?

This may depend on your driver program. Do you spawn any threads in
it? Could you share some more information on the driver program, spark
version and your environment? It would greatly help others to help you

On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth909@gmail.com> wrote:
> The source of my problem is actually that I am running into the following
> error. This error seems to happen after running my driver program for 4
> hours.
>
> "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
> "dag-scheduler-event-loop" Exception in thread "ForkJoinPool-50-worker-13"
> java.lang.OutOfMemoryError: unable to create new native thread"
>
> and this wonderful book taught me that the error "unable to create new
> native thread" can happen because JVM is trying to request the OS for a
> thread and it is refusing to do so for the following reasons
>
> 1. The system has actually run out of virtual memory.
> 2. On Unix-style systems, the user has already created (between all programs
> user is running) the maximum number of processes configured for that user
> login. Individual threads are considered a process in that regard.
>
> Option #2 is ruled out in my case because my driver programing is running
> with a userid of root which has  maximum number of processes set to 120242
>
> ulimit -a gives me the following
>
> core file size          (blocks, -c) 0
> data seg size           (kbytes, -d) unlimited
> scheduling priority             (-e) 0
> file size               (blocks, -f) unlimited
> pending signals                 (-i) 120242
> max locked memory       (kbytes, -l) 64
> max memory size         (kbytes, -m) unlimited
> open files                      (-n) 1024
> pipe size            (512 bytes, -p) 8
> POSIX message queues     (bytes, -q) 819200
> real-time priority              (-r) 0
> stack size              (kbytes, -s) 8192
> cpu time               (seconds, -t) unlimited
> max user processes              (-u) 120242
> virtual memory          (kbytes, -v) unlimited
> file locks                      (-x) unlimited
>
> So at this point I do understand that the I am running out of memory due to
> allocation of threads so my biggest question is how do I tell my spark
> driver program to not create so many?
>
> On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <sowen@cloudera.com> wrote:
>>
>> ps -L [pid] is what shows threads. I am not sure this is counting what you
>> think it does. My shell process has about a hundred threads, and I can't
>> imagine why one would have thousands unless your app spawned them.
>>
>> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth909@gmail.com> wrote:
>>>
>>> when I do
>>>
>>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>>
>>> The result is around 32K. why does it create so many threads how can I
>>> limit this?
>
>