flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashish Pokharel <ashish...@yahoo.com>
Subject Re: Task Manager detached under load
Date Tue, 06 Feb 2018 01:26:06 GMT
Hi Till,

Thanks for detailed response. I will try to gather some of this information during the week
and follow up.

— Ashish

> On Feb 5, 2018, at 5:55 AM, Till Rohrmann <trohrmann@apache.org> wrote:
> 
> Hi,
> 
> this sounds like a serious regression wrt Flink 1.3.2 and we should definitely find out
what's causing this problem. Given from what I see in the logs, the following happens:
> 
> For some time the JobManager seems to no longer receive heartbeats from the TaskManager.
This could be, for example, due to long GC pauses or heavy load which starves the ActorSystem's
threads which are responsible for sending the heartbeats. Due to this, the TM's ActorSystem
is quarantined which effectively renders them useless because the JM will henceforth ignore
all messages from these systems. The only way to resolve this problem is to restart the ActorSystem.
By setting taskmanager.exit-on-fatal-akka-error to true in flink-conf.yaml, a quarantined
TM will shut down. If you run the Flink cluster on Yarn, then a new substitute TM will be
started if you have still some container restarts left. That way, the system should be able
to recover.
> 
> Additionally you could try to play around with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause
which control the heartbeat interval and the acceptable pause. By increasing the latter, the
system should tolerate longer GC pauses and period of high load.
> 
> However, this only addresses the symptoms of the problem and I'd like to find out what's
causing the problem. In order to further debug the problem, it would be really helpful to
obtain the logs of the JobManager and the TaskManagers on DEBUG log level and with taskmanager.debug.memory.startLogThread
set to true. Additionally it would be interesting to see whats happening on the TaskManagers
when you observe high load. So obtaining a profiler dump via VisualVM would be great. And
last but not least, it also helps to learn more about the job you're running. What kind of
connectors is it using? Are you using Flink's metric system? How is the Flink cluster deployed?
Which other libraries are you using in your job?
> 
> Thanks a lot for your help!
> 
> Cheers,
> Till
> 
> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cresny@gmail.com <mailto:cresny@gmail.com>>
wrote:
> I've seen a similar issue while running successive Flink SQL batches on 1.4. In my case,
the Job Manager would fail with the log output about unreachability (with an additional statement
about something going "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
everything works perfectly, but we will try again soon on 1.4. When we do I will post the
actual log output.
> 
> This was on YARN in AWS, with akka.ask.timeout = 60s.
> 
> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashishpok@yahoo.com <mailto:ashishpok@yahoo.com>>
wrote:
> I haven’t gotten much further with this. It doesn’t look like GC related - at least
GC counters were not that atrocious. However, my main concern was once the load subsides why
aren’t TM and JM connecting again? That doesn’t look normal. I could definitely tell JM
was listening on the port and from logs it does appear TM is trying to message JM that is
still alive. 
> 
> Thanks, Ashish
> 
>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <lassenedergaard@gmail.com <mailto:lassenedergaard@gmail.com>>
wrote:
>> 
>> Hi. 
>> 
>> Did you find a reason for the detaching ?
>> I sometimes see the same on our system running Flink 1.4 on dc/os. I have enabled
taskmanager.Debug.memory.startlogthread for debugging. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientruong@gmail.com <mailto:duckientruong@gmail.com>>:
>> 
>>> Hi,
>>> 
>>> You should enable and check your garbage collection log.
>>> 
>>> We've encountered case where Task Manager disassociated due to long GC pause.
>>> 
>>> Regards,
>>> 
>>> Kien
>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>> Hi All,
>>>> 
>>>> We have hit some load related issues and was wondering if any one has some
suggestions. We are noticing task managers and job managers being detached from each other
under load and never really sync up again. As a result, Flink session shows 0 slots available
for processing. Even though, apps are configured to restart it isn't really helping as there
are no slots available to run the apps.
>>>> 
>>>> 
>>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest
of the logs for brevity)
>>>> 
>>>> Job Manager:
>>>> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -  Starting JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40
UTC)
>>>> 
>>>> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -  Maximum heap size: 16384 MiBytes
>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -  Hadoop version: 2.6.5
>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -  JVM Options:
>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -     -Xms16384m
>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -     -Xmx16384m
>>>> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobmanager.JobManager
               -     -XX:+UseG1GC
>>>> 
>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration.GlobalConfiguration
           - Loading configuration property: jobmanager.rpc.port, 6123
>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration.GlobalConfiguration
           - Loading configuration property: jobmanager.heap.mb, 16384
>>>> 
>>>> 
>>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher                 
                   - Detected unreachable: [akka.tcp://flink@<jm-host>:37840 <>]
>>>> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobmanager.JobManager
               - Task manager akka.tcp://flink@<jm-host>:37840/user/taskmanager <>
terminated.
>>>> 
>>>> -- So once Flink session boots up, we are hitting it with pretty heavy load,
which typically results in the WARN above
>>>> 
>>>> Task Manager:
>>>> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -  Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40
UTC)
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -  Hadoop version: 2.6.5
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -  JVM Options:
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -     -Xms16384M
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -     -Xmx16384M
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -     -XX:MaxDirectMemorySize=8388607T
>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             -     -XX:+UseG1GC
>>>> 
>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration.GlobalConfiguration
           - Loading configuration property: jobmanager.rpc.port, 6123
>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration.GlobalConfiguration
           - Loading configuration property: jobmanager.heap.mb, 16384
>>>> 
>>>> 
>>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher                 
                   - Detected unreachable: [akka.tcp://flink@<jm-host>:6123 <>]
>>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting                      
                   - Quarantined address [akka.tcp://flink@<jm-host>:6123 <>]
is still unreachable or has not been restarted. Keeping it quarantined.
>>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting                       
                  - Tried to associate with unreachable remote address [akka.tcp://flink@<tm-host>:6123
<>]. Address is now               gated for 5000 ms, all messages to this address will
be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined.
Association aborted.] 
>>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting                      
                   - Tried to associate with unreachable remote address [akka.tcp://flink@<tm-host>:6123
<>]. Address is now gated for 5000 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this system. No further associations
to the remote system are possible until this system is restarted.] 
>>>> <bunch of ERRORs on operations not shutdown properly - assuming because
JM is unreachable>
>>>> 
>>>> 2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskmanager.TaskManager
             - Trying to register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager
<>(attempt 10, timeout: 30000 milliseconds)
>>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting                      
                   - Tried to associate with unreachable remote address [akka.tcp://flink@<jm-host>:6123
<>]. Address is now gated for 5000 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this system. No further associations
to the remote system are possible until this system is restarted.] 
>>>> 
>>>> So bottom line is, JM and TM couldn't communicate under load, which is obviously
not good. I tried to bump up akka.tcp.timeout as well but it didnt help either. So my question
here is after all processing is halted and there is no new data being picked up, shouldn't
this environment self-heal? Any other things I can be looking at other than extending timeouts?
>>>> 
>>>> Thanks,
>>>> 
>>>> Ashish
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
> 
> 
> 


Mime
View raw message