flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore
Date Thu, 30 Nov 2017 16:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272890#comment-16272890
] 

ASF GitHub Bot commented on FLINK-8176:
---------------------------------------

GitHub user GJL opened a pull request:

    https://github.com/apache/flink/pull/5107

    [FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

    ## What is the purpose of the change
    
    The FLIP-6 dispatcher never calls `start()` on its SubmittedJobGraphStore instance. Hence,
when a Job is submitted (YARN session mode with HA enabled), an IllegalStateException is thrown.
This pull request adds the necessary changes so that jobs can be submitted.
    
    ## Brief change log
    
      - *Implement SubmittedJobGraphListener interface in Dispatcher*
     
    ## Verifying this change
    
      - *Added unit tests for new methods in Dispatcher class*
      - *Verified that jobs can be submitted in FLIP-6 YARN session mode with HA. Did not
verify anything else.*
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes
/ **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not
documented)
    
    CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-8176

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5107
    
----
commit d238ef0c23eea585974929eafdff33af916d19ba
Author: gyao <gary@data-artisans.com>
Date:   2017-11-30T14:37:30Z

    [hotfix][tests] Extract SubmittedJobGraphStore implementation from JobManagerHARecoveryTest

commit 33b9d2848c767088f43fed2d03e6402695827221
Author: gyao <gary@data-artisans.com>
Date:   2017-11-30T14:44:23Z

    [FLINK-8176][flip6] Implement SubmittedJobGraphListener interface in Dispatcher
    
    Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable
    this, the dispatcher must implement the SubmittedJobGraphListener interface. Add
    simple unit tests for the new methods. Refactor DispatcherTest to remove
    redundancy.

----


> Dispatcher does not start SubmittedJobGraphStore
> ------------------------------------------------
>
>                 Key: FLINK-8176
>                 URL: https://issues.apache.org/jira/browse/FLINK-8176
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Job-Submission, YARN
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> {{Dispatcher}} never calls start on its {{SubmittedJobGraphStore}} instance. Hence, when
a Job is submitted (YARN session mode with HA enabled), an {{IllegalStateException}} is thrown:
> {noformat}
> java.lang.IllegalStateException: Not running. Forgot to call start()?
>         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.verifyIsRunning(ZooKeeperSubmittedJobGraphStore.java:411)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.putJobGraph(ZooKeeperSubmittedJobGraphStore.java:222)
>         at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:202)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:207)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:151)
>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> {noformat}
> *Expected Behavior*
> In {{start()}} method, the submittedJobGraphStore should be started as so:
> {code}
> submittedJobGraphStore.start(this);
> {code}
> To enable this, the {{Dispatcher}} must implement the interface {{SubmittedJobGraphStore.SubmittedJobGraphListener}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message