flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3544) ResourceManager runtime components
Date Tue, 22 Mar 2016 17:16:25 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206815#comment-15206815
] 

ASF GitHub Bot commented on FLINK-3544:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r57028233
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -312,59 +323,121 @@ class JobManager(
     
           leaderSessionID = None
     
    -    case RegisterTaskManager(
    -      connectionInfo,
    -      hardwareInformation,
    -      numberOfSlots) =>
    +    case msg: RegisterResourceManager =>
    +      log.debug(s"Resource manager registration: $msg")
    +
    +      // ditch current resource manager (if any)
    +      currentResourceManager = Option(msg.resourceManager())
    +
    +      val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
    +        instance => instance.getResourceId).toList.asJava
    +
    +      // confirm registration and send known task managers with their resource ids
    +      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
    +
    +    case msg: DisconnectResourceManager =>
    +      log.debug(s"Resource manager disconnect: $msg")
    +
    +      currentResourceManager match {
    +        case Some(rm) if rm.equals(msg.resourceManager()) =>
    +          // we should ditch the current resource manager
    +          log.debug(s"Disconnecting resource manager $rm.")
    +          // send the old one a disconnect message
    +          rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
    +          currentResourceManager = None
    +        case None =>
    +          // not connected, thus ignoring this message
    +          log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.")
    +      }
    +
    +    case msg @ RegisterTaskManager(
    +          resourceId,
    +          connectionInfo,
    +          hardwareInformation,
    +          numberOfSlots) =>
    +      // we are being informed by the ResourceManager that a new task manager is available
    +      log.debug(s"RegisterTaskManager: $msg")
     
           val taskManager = sender()
     
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout)
    +          future.onComplete {
    +            case scala.util.Success(response) =>
    +              // the resource manager is available and answered
    +              self ! response
    +            case scala.util.Failure(t) =>
    --- End diff --
    
    Could be good to log the failure.


> ResourceManager runtime components
> ----------------------------------
>
>                 Key: FLINK-3544
>                 URL: https://issues.apache.org/jira/browse/FLINK-3544
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>    Affects Versions: 1.1.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message