flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Date Tue, 22 Mar 2016 17:34:16 GMT
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r57031901
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -312,59 +323,121 @@ class JobManager(
     
           leaderSessionID = None
     
    -    case RegisterTaskManager(
    -      connectionInfo,
    -      hardwareInformation,
    -      numberOfSlots) =>
    +    case msg: RegisterResourceManager =>
    +      log.debug(s"Resource manager registration: $msg")
    +
    +      // ditch current resource manager (if any)
    +      currentResourceManager = Option(msg.resourceManager())
    +
    +      val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
    +        instance => instance.getResourceId).toList.asJava
    +
    +      // confirm registration and send known task managers with their resource ids
    +      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
    +
    +    case msg: DisconnectResourceManager =>
    +      log.debug(s"Resource manager disconnect: $msg")
    +
    +      currentResourceManager match {
    +        case Some(rm) if rm.equals(msg.resourceManager()) =>
    +          // we should ditch the current resource manager
    +          log.debug(s"Disconnecting resource manager $rm.")
    +          // send the old one a disconnect message
    +          rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
    +          currentResourceManager = None
    +        case None =>
    +          // not connected, thus ignoring this message
    +          log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.")
    +      }
    +
    +    case msg @ RegisterTaskManager(
    +          resourceId,
    +          connectionInfo,
    +          hardwareInformation,
    +          numberOfSlots) =>
    +      // we are being informed by the ResourceManager that a new task manager is available
    +      log.debug(s"RegisterTaskManager: $msg")
     
           val taskManager = sender()
     
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout)
    +          future.onComplete {
    +            case scala.util.Success(response) =>
    +              // the resource manager is available and answered
    +              self ! response
    +            case scala.util.Failure(t) =>
    --- End diff --
    
    Done (doesn't change the diff).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message