spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: why spark driver program is creating so many threads? How can I limit this number?
Date Tue, 01 Nov 2016 02:10:07 GMT
Hi Ryan,

I think you are right. This may not be related to the Receiver. I have
attached jstack dump here. I do a simple MapToPair and reduceByKey and  I
have a window Interval of 1 minute (60000ms) and batch interval of 1s (1000)
This is generating lot of threads atleast 5 to 8 threads per second and the
total number of threads is monotonically increasing. So just for tweaking
purpose I changed my window interval to 1min (60000ms) and batch interval
of 10s (10000) this looked lot better but still not ideal at very least it
is not monotonic anymore (It goes up and down). Now my question  really is
how do I tune such that my number of threads are optimal while satisfying
the window Interval of 1 minute (60000ms) and batch interval of 1s (1000) ?

This jstack dump is taken after running my spark driver program for 2 mins
and there are about 1000 threads.

Thanks!


On Mon, Oct 31, 2016 at 1:09 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> If there is some leaking threads, I think you should be able to see the
> number of threads is increasing. You can just dump threads after 1-2 hours.
>
> On Mon, Oct 31, 2016 at 12:59 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> yes I can certainly use jstack but it requires 4 to 5 hours for me to
>> reproduce the error so I can get back as early as possible.
>>
>> Thanks a lot!
>>
>> On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Then it should not be a Receiver issue. Could you use `jstack` to find
>>> out the name of leaking threads?
>>>
>>> On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth909@gmail.com>
>>> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> It happens on the driver side and I am running on a client mode (not
>>>> the cluster mode).
>>>>
>>>> Thanks!
>>>>
>>>> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> Sorry, there is a typo in my previous email: this may **not** be the
>>>>> root cause if the leak threads are in the driver side.
>>>>>
>>>>> Does it happen in the driver or executors?
>>>>>
>>>>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> Ahh My Receiver.onStop method is currently empty.
>>>>>>
>>>>>> 1) I have a hard time seeing why the receiver would crash so many
times within a span of 4 to 5 hours but anyways I understand I should still cleanup during
OnStop.
>>>>>>
>>>>>> 2) How do I clean up those threads? The documentation here https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html
doesn't seem to have any method where I can clean up the threads created during OnStart. any
ideas?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>>>>>> shixiong@databricks.com> wrote:
>>>>>>
>>>>>>> So in your code, each Receiver will start a new thread. Did you
stop
>>>>>>> the receiver properly in `Receiver.onStop`? Otherwise, you may
leak threads
>>>>>>> after a receiver crashes and is restarted by Spark. However,
this may be
>>>>>>> the root cause since the leak threads are in the driver side.
Could you use
>>>>>>> `jstack` to check which types of threads are leaking?
>>>>>>>
>>>>>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am also under the assumption that *onStart *function of
the
>>>>>>>> Receiver is only called only once by Spark. please correct
me if I
>>>>>>>> am wrong.
>>>>>>>>
>>>>>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth909@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> My driver program runs a spark streaming job.  And it
spawns a
>>>>>>>>> thread by itself only in the *onStart()* function below
Other
>>>>>>>>> than that it doesn't spawn any other threads. It only
calls MapToPair,
>>>>>>>>> ReduceByKey, forEachRDD, Collect functions.
>>>>>>>>>
>>>>>>>>> public class NSQReceiver extends Receiver<String>
{
>>>>>>>>>
>>>>>>>>>     private String topic="";
>>>>>>>>>
>>>>>>>>>     public NSQReceiver(String topic) {
>>>>>>>>>         super(StorageLevel.MEMORY_AND_DISK_2());
>>>>>>>>>         this.topic = topic;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void *onStart()* {
>>>>>>>>>         new Thread()  {
>>>>>>>>>             @Override public void run() {
>>>>>>>>>                 receive();
>>>>>>>>>             }
>>>>>>>>>         }.start();
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Environment info:
>>>>>>>>>
>>>>>>>>> Java 8
>>>>>>>>>
>>>>>>>>> Scala 2.11.8
>>>>>>>>>
>>>>>>>>> Spark 2.0.0
>>>>>>>>>
>>>>>>>>> More than happy to share any other info you may need.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <jakob@odersky.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>>  > how do I tell my spark driver program to not
create so many?
>>>>>>>>>>
>>>>>>>>>> This may depend on your driver program. Do you spawn
any threads
>>>>>>>>>> in
>>>>>>>>>> it? Could you share some more information on the
driver program,
>>>>>>>>>> spark
>>>>>>>>>> version and your environment? It would greatly help
others to
>>>>>>>>>> help you
>>>>>>>>>>
>>>>>>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth909@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> > The source of my problem is actually that I
am running into the
>>>>>>>>>> following
>>>>>>>>>> > error. This error seems to happen after running
my driver
>>>>>>>>>> program for 4
>>>>>>>>>> > hours.
>>>>>>>>>> >
>>>>>>>>>> > "Exception in thread "ForkJoinPool-50-worker-11"
Exception in
>>>>>>>>>> thread
>>>>>>>>>> > "dag-scheduler-event-loop" Exception in thread
>>>>>>>>>> "ForkJoinPool-50-worker-13"
>>>>>>>>>> > java.lang.OutOfMemoryError: unable to create
new native thread"
>>>>>>>>>> >
>>>>>>>>>> > and this wonderful book taught me that the error
"unable to
>>>>>>>>>> create new
>>>>>>>>>> > native thread" can happen because JVM is trying
to request the
>>>>>>>>>> OS for a
>>>>>>>>>> > thread and it is refusing to do so for the following
reasons
>>>>>>>>>> >
>>>>>>>>>> > 1. The system has actually run out of virtual
memory.
>>>>>>>>>> > 2. On Unix-style systems, the user has already
created (between
>>>>>>>>>> all programs
>>>>>>>>>> > user is running) the maximum number of processes
configured for
>>>>>>>>>> that user
>>>>>>>>>> > login. Individual threads are considered a process
in that
>>>>>>>>>> regard.
>>>>>>>>>> >
>>>>>>>>>> > Option #2 is ruled out in my case because my
driver programing
>>>>>>>>>> is running
>>>>>>>>>> > with a userid of root which has  maximum number
of processes
>>>>>>>>>> set to 120242
>>>>>>>>>> >
>>>>>>>>>> > ulimit -a gives me the following
>>>>>>>>>> >
>>>>>>>>>> > core file size          (blocks, -c) 0
>>>>>>>>>> > data seg size           (kbytes, -d) unlimited
>>>>>>>>>> > scheduling priority             (-e) 0
>>>>>>>>>> > file size               (blocks, -f) unlimited
>>>>>>>>>> > pending signals                 (-i) 120242
>>>>>>>>>> > max locked memory       (kbytes, -l) 64
>>>>>>>>>> > max memory size         (kbytes, -m) unlimited
>>>>>>>>>> > open files                      (-n) 1024
>>>>>>>>>> > pipe size            (512 bytes, -p) 8
>>>>>>>>>> > POSIX message queues     (bytes, -q) 819200
>>>>>>>>>> > real-time priority              (-r) 0
>>>>>>>>>> > stack size              (kbytes, -s) 8192
>>>>>>>>>> > cpu time               (seconds, -t) unlimited
>>>>>>>>>> > max user processes              (-u) 120242
>>>>>>>>>> > virtual memory          (kbytes, -v) unlimited
>>>>>>>>>> > file locks                      (-x) unlimited
>>>>>>>>>> >
>>>>>>>>>> > So at this point I do understand that the I
am running out of
>>>>>>>>>> memory due to
>>>>>>>>>> > allocation of threads so my biggest question
is how do I tell
>>>>>>>>>> my spark
>>>>>>>>>> > driver program to not create so many?
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <sowen@cloudera.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> ps -L [pid] is what shows threads. I am
not sure this is
>>>>>>>>>> counting what you
>>>>>>>>>> >> think it does. My shell process has about
a hundred threads,
>>>>>>>>>> and I can't
>>>>>>>>>> >> imagine why one would have thousands unless
your app spawned
>>>>>>>>>> them.
>>>>>>>>>> >>
>>>>>>>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali
<
>>>>>>>>>> kanth909@gmail.com> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> when I do
>>>>>>>>>> >>>
>>>>>>>>>> >>> ps -elfT | grep "spark-driver-program.jar"
| wc -l
>>>>>>>>>> >>>
>>>>>>>>>> >>> The result is around 32K. why does it
create so many threads
>>>>>>>>>> how can I
>>>>>>>>>> >>> limit this?
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message