flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Thu, 18 Aug 2016 13:05:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426395#comment-15426395
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75303392
  
    --- Diff: flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
---
    @@ -0,0 +1,439 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.scheduler
    +
    +import java.util.{Collections, UUID}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import akka.actor.FSM.StateTimeout
    +import akka.testkit._
    +import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest}
    +import com.netflix.fenzo._
    +import com.netflix.fenzo.functions.{Action1, Action2}
    +import com.netflix.fenzo.plugins.VMLeaseObject
    +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator._
    +import org.apache.flink.mesos.scheduler.messages._
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.mesos.Protos.{SlaveID, TaskInfo}
    +import org.apache.mesos.{SchedulerDriver, Protos}
    +import org.junit.runner.RunWith
    +import org.mockito.Mockito.{verify, _}
    +import org.mockito.invocation.InvocationOnMock
    +import org.mockito.stubbing.Answer
    +import org.mockito.{Matchers => MM, Mockito}
    +import org.scalatest.junit.JUnitRunner
    +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.flink.mesos.Utils.range
    +import org.apache.flink.mesos.Utils.ranges
    +import org.apache.flink.mesos.Utils.scalar
    +
    +@RunWith(classOf[JUnitRunner])
    +class LaunchCoordinatorTest
    +  extends TestKitBase
    +    with ImplicitSender
    +    with WordSpecLike
    +    with Matchers
    +    with BeforeAndAfterAll {
    +
    +  lazy val config = new Configuration()
    +  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
    +
    +  override def afterAll(): Unit = {
    +    TestKit.shutdownActorSystem(system)
    +  }
    +
    +  def randomFramework = {
    +    Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID.toString).build
    +  }
    +
    +  def randomTask = {
    +    val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
    +
    +    def generateTaskRequest = {
    +      new TaskRequest() {
    +        private[mesos] val assignedResources = new AtomicReference[TaskRequest.AssignedResources]
    +        override def getId: String = taskID.getValue
    +        override def taskGroupName: String = ""
    +        override def getCPUs: Double = 1.0
    +        override def getMemory: Double = 1024.0
    +        override def getNetworkMbps: Double = 0.0
    +        override def getDisk: Double = 0.0
    +        override def getPorts: Int = 1
    +        override def getCustomNamedResources: java.util.Map[String, NamedResourceSetRequest]
=
    +          Collections.emptyMap[String, NamedResourceSetRequest]
    +        override def getSoftConstraints: java.util.List[_ <: VMTaskFitnessCalculator]
= null
    +        override def getHardConstraints: java.util.List[_ <: ConstraintEvaluator]
= null
    +        override def getAssignedResources: AssignedResources = assignedResources.get()
    +        override def setAssignedResources(assignedResources: AssignedResources): Unit
= {
    +          this.assignedResources.set(assignedResources)
    +        }
    +      }
    +    }
    +
    +    val task: LaunchableTask = new LaunchableTask() {
    +      override def taskRequest: TaskRequest = generateTaskRequest
    +      override def launch(slaveId: SlaveID, taskAssignment: TaskAssignmentResult): Protos.TaskInfo
= {
    +        Protos.TaskInfo.newBuilder
    +          .setTaskId(taskID).setName(taskID.getValue)
    +          .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
    +          .setSlaveId(slaveId)
    +          .build()
    +      }
    +      override def toString = taskRequest.getId
    +    }
    +
    +    (taskID, task)
    +  }
    +
    +  def randomSlave = {
    +    val slaveID = Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build
    +    val hostname = s"host-${slaveID.getValue}"
    +    (slaveID, hostname)
    +  }
    +
    +  def randomOffer(frameworkID: Protos.FrameworkID, slave: (Protos.SlaveID, String)) =
{
    +    val offerID = Protos.OfferID.newBuilder().setValue(UUID.randomUUID.toString)
    +    Protos.Offer.newBuilder()
    +      .setFrameworkId(frameworkID)
    +      .setId(offerID)
    +      .setSlaveId(slave._1)
    +      .setHostname(slave._2)
    +      .addResources(scalar("cpus", 0.75))
    +      .addResources(scalar("mem", 4096.0))
    +      .addResources(scalar("disk", 1024.0))
    +      .addResources(ranges("ports", range(9000, 9001)))
    +      .build()
    +  }
    +
    +  def lease(offer: Protos.Offer) = {
    +    new VMLeaseObject(offer)
    +  }
    +
    +  /**
    +    * Mock a successful task assignment result matching a task to an offer.
    +    */
    +  def taskAssignmentResult(lease: VirtualMachineLease, task: TaskRequest): TaskAssignmentResult
= {
    +    val ports = lease.portRanges().get(0)
    +    val r = mock(classOf[TaskAssignmentResult])
    +    when(r.getTaskId).thenReturn(task.getId)
    +    when(r.getHostname).thenReturn(lease.hostname())
    +    when(r.getAssignedPorts).thenReturn(
    +      (ports.getBeg to ports.getBeg + task.getPorts).toList.asJava.asInstanceOf[java.util.List[Integer]])
    +    when(r.getRequest).thenReturn(task)
    +    when(r.isSuccessful).thenReturn(true)
    +    when(r.getFitness).thenReturn(1.0)
    +    r
    +  }
    +
    +  /**
    +    * Mock a VM assignment result with the given leases and tasks.
    +    */
    +  def vmAssignmentResult(hostname: String,
    +                         leasesUsed: Seq[VirtualMachineLease],
    +                         tasksAssigned: Set[TaskAssignmentResult]): VMAssignmentResult
= {
    +    new VMAssignmentResult(hostname, leasesUsed.asJava, tasksAssigned.asJava)
    +  }
    +
    +  /**
    +    * Mock a scheduling result with the given successes and failures.
    +    */
    +  def schedulingResult(successes: Seq[VMAssignmentResult],
    +                       failures: Seq[TaskAssignmentResult] = Nil,
    +                       exceptions: Seq[Exception] = Nil,
    +                       leasesAdded: Int = 0,
    +                       leasesRejected: Int = 0): SchedulingResult = {
    +    val r = mock(classOf[SchedulingResult])
    +    when(r.getResultMap).thenReturn(successes.map(r => r.getHostname -> r).toMap.asJava)
    +    when(r.getExceptions).thenReturn(exceptions.asJava)
    +    val groupedFailures = failures.groupBy(_.getRequest).mapValues(_.asJava)
    +    when(r.getFailures).thenReturn(groupedFailures.asJava)
    +    when(r.getLeasesAdded).thenReturn(leasesAdded)
    +    when(r.getLeasesRejected).thenReturn(leasesRejected)
    +    when(r.getRuntime).thenReturn(0)
    +    when(r.getNumAllocations).thenThrow(new NotImplementedError())
    +    when(r.getTotalVMsCount).thenThrow(new NotImplementedError())
    +    when(r.getIdleVMsCount).thenThrow(new NotImplementedError())
    +    r
    +  }
    +
    +
    +  /**
    +    * Mock a task scheduler.
    +    * The task assigner/unassigner is pre-wired.
    +    */
    +  def taskScheduler() = {
    +    val optimizer = mock(classOf[TaskScheduler])
    +    val taskAssigner = mock(classOf[Action2[TaskRequest, String]])
    +    when[Action2[TaskRequest, String]](optimizer.getTaskAssigner).thenReturn(taskAssigner)
    +    val taskUnassigner = mock(classOf[Action2[String, String]])
    +    when[Action2[String, String]](optimizer.getTaskUnAssigner).thenReturn(taskUnassigner)
    +    optimizer
    +  }
    +
    +  /**
    +    * Create a task scheduler builder.
    +    */
    +  def taskSchedulerBuilder(optimizer: TaskScheduler) = new TaskSchedulerBuilder {
    +    var leaseRejectAction: Action1[VirtualMachineLease] = null
    +    override def withLeaseRejectAction(action: Action1[VirtualMachineLease]): TaskSchedulerBuilder
= {
    +      leaseRejectAction = action
    +      this
    +    }
    +    override def build(): TaskScheduler = optimizer
    +  }
    +
    +  /**
    +    * Process a call to scheduleOnce with the given function.
    +    */
    +  def scheduleOnce(f: (Seq[TaskRequest],Seq[VirtualMachineLease]) => SchedulingResult)
= {
    +    new Answer[SchedulingResult] {
    +      override def answer(invocationOnMock: InvocationOnMock): SchedulingResult = {
    +        val args = invocationOnMock.getArguments
    +        val requests = args(0).asInstanceOf[java.util.List[TaskRequest]]
    +        val newLeases = args(1).asInstanceOf[java.util.List[VirtualMachineLease]]
    +        f(requests.asScala, newLeases.asScala)
    +      }
    +    }
    +  }
    +
    +  /**
    +    * The context fixture.
    +    */
    +  class Context {
    +    val optimizer = taskScheduler()
    +    val optimizerBuilder = taskSchedulerBuilder(optimizer)
    +    val schedulerDriver = mock(classOf[SchedulerDriver])
    +    val trace = Mockito.inOrder(schedulerDriver)
    +    val fsm = TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
    +
    +    val framework = randomFramework
    +    val task1 = randomTask
    +    val task2 = randomTask
    +    val task3 = randomTask
    +
    +    val slave1 = {
    +      val slave = randomSlave
    +      (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave),
randomOffer(framework, slave))
    +    }
    +
    +    val slave2 = {
    +      val slave = randomSlave
    +      (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave),
randomOffer(framework, slave))
    +    }
    +  }
    +
    +  def inState = afterWord("in state")
    +  def handle = afterWord("handle")
    +
    +  def handlesAssignments(state: TaskState) = {
    +    "Unassign" which {
    +      s"stays in $state with updated optimizer state" in new Context {
    --- End diff --
    
    Very nice testing although I'm not much of a fan of the Scala `WordSpec` :)


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message