spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: why spark driver program is creating so many threads? How can I limit this number?
Date Mon, 31 Oct 2016 19:20:34 GMT
Hi Ryan,

Ahh My Receiver.onStop method is currently empty.

1) I have a hard time seeing why the receiver would crash so many
times within a span of 4 to 5 hours but anyways I understand I should
still cleanup during OnStop.

2) How do I clean up those threads? The documentation here
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html
doesn't seem to have any method where I can clean up the threads
created during OnStart. any ideas?

Thanks!


On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
shixiong@databricks.com> wrote:

> So in your code, each Receiver will start a new thread. Did you stop the
> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
> after a receiver crashes and is restarted by Spark. However, this may be
> the root cause since the leak threads are in the driver side. Could you use
> `jstack` to check which types of threads are leaking?
>
> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth909@gmail.com> wrote:
>
>> I am also under the assumption that *onStart *function of the Receiver is
>> only called only once by Spark. please correct me if I am wrong.
>>
>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> My driver program runs a spark streaming job.  And it spawns a thread by
>>> itself only in the *onStart()* function below Other than that it
>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>>> forEachRDD, Collect functions.
>>>
>>> public class NSQReceiver extends Receiver<String> {
>>>
>>>     private String topic="";
>>>
>>>     public NSQReceiver(String topic) {
>>>         super(StorageLevel.MEMORY_AND_DISK_2());
>>>         this.topic = topic;
>>>     }
>>>
>>>     @Override
>>>     public void *onStart()* {
>>>         new Thread()  {
>>>             @Override public void run() {
>>>                 receive();
>>>             }
>>>         }.start();
>>>     }
>>>
>>> }
>>>
>>>
>>> Environment info:
>>>
>>> Java 8
>>>
>>> Scala 2.11.8
>>>
>>> Spark 2.0.0
>>>
>>> More than happy to share any other info you may need.
>>>
>>>
>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <jakob@odersky.com>
>>> wrote:
>>>
>>>>  > how do I tell my spark driver program to not create so many?
>>>>
>>>> This may depend on your driver program. Do you spawn any threads in
>>>> it? Could you share some more information on the driver program, spark
>>>> version and your environment? It would greatly help others to help you
>>>>
>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth909@gmail.com>
>>>> wrote:
>>>> > The source of my problem is actually that I am running into the
>>>> following
>>>> > error. This error seems to happen after running my driver program for
>>>> 4
>>>> > hours.
>>>> >
>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>>>> > "dag-scheduler-event-loop" Exception in thread
>>>> "ForkJoinPool-50-worker-13"
>>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>>> >
>>>> > and this wonderful book taught me that the error "unable to create new
>>>> > native thread" can happen because JVM is trying to request the OS for
>>>> a
>>>> > thread and it is refusing to do so for the following reasons
>>>> >
>>>> > 1. The system has actually run out of virtual memory.
>>>> > 2. On Unix-style systems, the user has already created (between all
>>>> programs
>>>> > user is running) the maximum number of processes configured for that
>>>> user
>>>> > login. Individual threads are considered a process in that regard.
>>>> >
>>>> > Option #2 is ruled out in my case because my driver programing is
>>>> running
>>>> > with a userid of root which has  maximum number of processes set to
>>>> 120242
>>>> >
>>>> > ulimit -a gives me the following
>>>> >
>>>> > core file size          (blocks, -c) 0
>>>> > data seg size           (kbytes, -d) unlimited
>>>> > scheduling priority             (-e) 0
>>>> > file size               (blocks, -f) unlimited
>>>> > pending signals                 (-i) 120242
>>>> > max locked memory       (kbytes, -l) 64
>>>> > max memory size         (kbytes, -m) unlimited
>>>> > open files                      (-n) 1024
>>>> > pipe size            (512 bytes, -p) 8
>>>> > POSIX message queues     (bytes, -q) 819200
>>>> > real-time priority              (-r) 0
>>>> > stack size              (kbytes, -s) 8192
>>>> > cpu time               (seconds, -t) unlimited
>>>> > max user processes              (-u) 120242
>>>> > virtual memory          (kbytes, -v) unlimited
>>>> > file locks                      (-x) unlimited
>>>> >
>>>> > So at this point I do understand that the I am running out of memory
>>>> due to
>>>> > allocation of threads so my biggest question is how do I tell my spark
>>>> > driver program to not create so many?
>>>> >
>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <sowen@cloudera.com>
>>>> wrote:
>>>> >>
>>>> >> ps -L [pid] is what shows threads. I am not sure this is counting
>>>> what you
>>>> >> think it does. My shell process has about a hundred threads, and
I
>>>> can't
>>>> >> imagine why one would have thousands unless your app spawned them.
>>>> >>
>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth909@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>> when I do
>>>> >>>
>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>>> >>>
>>>> >>> The result is around 32K. why does it create so many threads
how
>>>> can I
>>>> >>> limit this?
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Mime
View raw message