flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Thu, 18 Aug 2016 12:54:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426378#comment-15426378
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75301829
  
    --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.clusterframework
    +
    +import java.util.concurrent.{TimeUnit, ExecutorService}
    +
    +import akka.actor.ActorRef
    +
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
    +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus
    +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
    +import org.apache.flink.runtime.clusterframework.messages._
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService
    +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus,
JobNotFound}
    +import org.apache.flink.runtime.messages.Messages.Acknowledge
    +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
    +import org.apache.flink.runtime.instance.InstanceManager
    +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
    +
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
    +
    +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with
additional messages
    +  * to start/administer/stop the session.
    +  *
    +  * @param flinkConfiguration Configuration object for the actor
    +  * @param executorService Execution context which is used to execute concurrent tasks
in the
    +  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
    +  * @param instanceManager Instance manager to manage the registered
    +  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
    +  * @param scheduler Scheduler to schedule Flink jobs
    +  * @param libraryCacheManager Manager to manage uploaded jar files
    +  * @param archive Archive for finished Flink jobs
    +  * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
    +  * @param timeout Timeout for futures
    +  * @param leaderElectionService LeaderElectionService to participate in the leader election
    +  */
    +abstract class ContaineredJobManager(
    +                      flinkConfiguration: FlinkConfiguration,
    +                      executorService: ExecutorService,
    +                      instanceManager: InstanceManager,
    +                      scheduler: FlinkScheduler,
    +                      libraryCacheManager: BlobLibraryCacheManager,
    +                      archive: ActorRef,
    +                      restartStrategyFactory: RestartStrategyFactory,
    +                      timeout: FiniteDuration,
    +                      leaderElectionService: LeaderElectionService,
    +                      submittedJobGraphs : SubmittedJobGraphStore,
    +                      checkpointRecoveryFactory : CheckpointRecoveryFactory,
    +                      savepointStore: SavepointStore,
    +                      jobRecoveryTimeout: FiniteDuration,
    +                      metricsRegistry: Option[FlinkMetricRegistry])
    +  extends JobManager(
    +    flinkConfiguration,
    +    executorService,
    +    instanceManager,
    +    scheduler,
    +    libraryCacheManager,
    +    archive,
    +    restartStrategyFactory,
    +    timeout,
    +    leaderElectionService,
    +    submittedJobGraphs,
    +    checkpointRecoveryFactory,
    +    savepointStore,
    +    jobRecoveryTimeout,
    +    metricsRegistry) {
    +
    +  val jobPollingInterval: FiniteDuration
    +
    +  // indicates if this JM has been started in a dedicated (per-job) mode.
    +  var stopWhenJobFinished: JobID = null
    +
    +  override def handleMessage: Receive = {
    +    handleContainerMessage orElse super.handleMessage
    +  }
    +
    +  def handleContainerMessage: Receive = {
    +
    +    case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
    +      // forward to ResourceManager
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          // we forward the message
    +          rm.forward(decorateMessage(msg))
    +        case None =>
    +        // client has to try again
    +      }
    +
    +    case msg: ShutdownClusterAfterJob =>
    +      val jobId = msg.jobId()
    +      log.info(s"ApplicationMaster will shut down session when job $jobId has finished.")
    +      stopWhenJobFinished = jobId
    +      // trigger regular job status messages (if this is a dedicated/per-job cluster)
    +      if (stopWhenJobFinished != null) {
    +        context.system.scheduler.schedule(0 seconds,
    --- End diff --
    
    The polling is a left-over of the old Yarn code. Indeed, would be nicer to apply a hook
immediately upon job removal.
    
    +1 for making `ContaineredJobManager` the base for the Yarn and Mesos JobManager.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message