flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Thu, 18 Aug 2016 14:37:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426550#comment-15426550
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75319776
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode>
{
    +
    +	/** The Mesos configuration (master and framework info) */
    +	private final MesosConfiguration mesosConfig;
    +
    +	/** The TaskManager container parameters (like container memory size) */
    +	private final MesosTaskManagerParameters taskManagerParameters;
    +
    +	/** Context information used to start a TaskManager Java process */
    +	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +	/** Number of failed Mesos tasks before stopping the application. -1 means infinite.
*/
    +	private final int maxFailedTasks;
    +
    +	/** Callback handler for the asynchronous Mesos scheduler */
    +	private SchedulerProxy schedulerCallbackHandler;
    +
    +	/** Mesos scheduler driver */
    +	private SchedulerDriver schedulerDriver;
    +
    +	private ActorRef connectionMonitor;
    +
    +	private ActorRef taskRouter;
    +
    +	private ActorRef launchCoordinator;
    +
    +	private ActorRef reconciliationCoordinator;
    +
    +	private MesosWorkerStore workerStore;
    +
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +	/** The number of failed tasks since the master became active */
    +	private int failedTasksSoFar;
    +
    +	public MesosFlinkResourceManager(
    +		Configuration flinkConfig,
    +		MesosConfiguration mesosConfig,
    +		MesosWorkerStore workerStore,
    +		LeaderRetrievalService leaderRetrievalService,
    +		MesosTaskManagerParameters taskManagerParameters,
    +		Protos.TaskInfo.Builder taskManagerLaunchContext,
    +		int maxFailedTasks,
    +		int numInitialTaskManagers) {
    +
    +		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
    +
    +		this.mesosConfig = requireNonNull(mesosConfig);
    --- End diff --
    
    No thanks, the other RM code uses `requireNonNull`.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message