flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Thu, 18 Aug 2016 09:36:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426161#comment-15426161
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75276925
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode>
{
    +
    +	/** The Mesos configuration (master and framework info) */
    +	private final MesosConfiguration mesosConfig;
    +
    +	/** The TaskManager container parameters (like container memory size) */
    +	private final MesosTaskManagerParameters taskManagerParameters;
    +
    +	/** Context information used to start a TaskManager Java process */
    +	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +	/** Number of failed Mesos tasks before stopping the application. -1 means infinite.
*/
    +	private final int maxFailedTasks;
    +
    +	/** Callback handler for the asynchronous Mesos scheduler */
    +	private SchedulerProxy schedulerCallbackHandler;
    +
    +	/** Mesos scheduler driver */
    +	private SchedulerDriver schedulerDriver;
    +
    +	private ActorRef connectionMonitor;
    +
    +	private ActorRef taskRouter;
    +
    +	private ActorRef launchCoordinator;
    +
    +	private ActorRef reconciliationCoordinator;
    +
    +	private MesosWorkerStore workerStore;
    +
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +	/** The number of failed tasks since the master became active */
    +	private int failedTasksSoFar;
    +
    +	public MesosFlinkResourceManager(
    +		Configuration flinkConfig,
    +		MesosConfiguration mesosConfig,
    +		MesosWorkerStore workerStore,
    +		LeaderRetrievalService leaderRetrievalService,
    +		MesosTaskManagerParameters taskManagerParameters,
    +		Protos.TaskInfo.Builder taskManagerLaunchContext,
    +		int maxFailedTasks,
    +		int numInitialTaskManagers) {
    +
    +		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
    +
    +		this.mesosConfig = requireNonNull(mesosConfig);
    +
    +		this.workerStore = requireNonNull(workerStore);
    +
    +		this.taskManagerParameters = requireNonNull(taskManagerParameters);
    +		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
    +		this.maxFailedTasks = maxFailedTasks;
    +
    +		this.workersInNew = new HashMap<>();
    +		this.workersInLaunch = new HashMap<>();
    +		this.workersBeingReturned = new HashMap<>();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Mesos-specific behavior
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void initialize() throws Exception {
    +		LOG.info("Initializing Mesos resource master");
    +
    +		workerStore.start();
    +
    +		// create the scheduler driver to communicate with Mesos
    +		schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +		// register with Mesos
    +		FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
    +			.clone()
    +			.setCheckpoint(true);
    +
    +		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
    +		if(frameworkID.isEmpty()) {
    +			LOG.info("Registering as new framework.");
    +		}
    +		else {
    +			LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
    +			frameworkInfo.setId(frameworkID.get());
    +		}
    +
    +		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
    +		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +		// create supporting actors
    +		connectionMonitor = createConnectionMonitor();
    +		launchCoordinator = createLaunchCoordinator();
    +		reconciliationCoordinator = createReconciliationCoordinator();
    +		taskRouter = createTaskRouter();
    +
    +		recoverWorkers();
    +
    +		connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +		schedulerDriver.start();
    +	}
    +
    +	protected ActorRef createConnectionMonitor() {
    +		return context().actorOf(
    +			ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +			"connectionMonitor");
    +	}
    +
    +	protected ActorRef createTaskRouter() {
    +		return context().actorOf(
    +			Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class),
    +			"tasks");
    +	}
    +
    +	protected ActorRef createLaunchCoordinator() {
    +		return context().actorOf(
    +			LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver,
createOptimizer()),
    +			"launchCoordinator");
    +	}
    +
    +	protected ActorRef createReconciliationCoordinator() {
    +		return context().actorOf(
    +			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config,
schedulerDriver),
    +			"reconciliationCoordinator");
    +	}
    +
    +	@Override
    +	public void postStop() {
    +		LOG.info("Stopping Mesos resource master");
    +		super.postStop();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Actor messages
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void handleMessage(Object message) {
    +
    +		// check for Mesos-specific actor messages first
    +
    +		// --- messages about Mesos connection
    +		if (message instanceof Registered) {
    +			registered((Registered) message);
    +		} else if (message instanceof ReRegistered) {
    +			reregistered((ReRegistered) message);
    +		} else if (message instanceof Disconnected) {
    +			disconnected((Disconnected) message);
    +		} else if (message instanceof Error) {
    +			error(((Error) message).message());
    +
    +		// --- messages about offers
    +		} else if (message instanceof ResourceOffers || message instanceof OfferRescinded)
{
    +			launchCoordinator.tell(message, self());
    +		} else if (message instanceof AcceptOffers) {
    +			acceptOffers((AcceptOffers) message);
    +
    +		// --- messages about tasks
    +		} else if (message instanceof StatusUpdate) {
    +			taskStatusUpdated((StatusUpdate) message);
    +		} else if (message instanceof ReconciliationCoordinator.Reconcile) {
    +			// a reconciliation request from a task
    +			reconciliationCoordinator.tell(message, self());
    +		} else if (message instanceof TaskMonitor.TaskTerminated) {
    +			// a termination message from a task
    +			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
    +			taskTerminated(msg.taskID(), msg.status());
    +
    +		} else  {
    +			// message handled by the generic resource master code
    +			super.handleMessage(message);
    +		}
    +	}
    +
    +	/**
    +	 * Called to shut down the cluster (not a failover situation).
    +	 *
    +	 * @param finalStatus The application status to report.
    +	 * @param optionalDiagnostics An optional diagnostics message.
    +     */
    --- End diff --
    
    space. will refrain from any future space comments :)


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message