flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4490) Decouple Slot and Instance
Date Thu, 01 Sep 2016 12:14:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455206#comment-15455206
] 

ASF GitHub Bot commented on FLINK-4490:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2447#discussion_r77161750
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
---
    @@ -20,73 +20,125 @@
     
     import org.apache.flink.runtime.instance.SimpleSlot;
     
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * 
    + */
     public class SlotAllocationFuture {
    -	
    +
     	private final Object monitor = new Object();
    -	
    +
     	private volatile SimpleSlot slot;
    -	
    +
     	private volatile SlotAllocationFutureAction action;
    -	
    +
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Creates a future that is uncompleted.
    +	 */
     	public SlotAllocationFuture() {}
    -	
    +
    +	/**
    +	 * Creates a future that is immediately completed.
    +	 * 
    +	 * @param slot The task slot that completes the future.
    +	 */
     	public SlotAllocationFuture(SimpleSlot slot) {
     		this.slot = slot;
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
    -	
    -	public SimpleSlot waitTillAllocated() throws InterruptedException {
    -		return waitTillAllocated(0);
    -	}
    -	
    -	public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException {
    +
    +	public SimpleSlot waitTillCompleted() throws InterruptedException {
     		synchronized (monitor) {
     			while (slot == null) {
    -				monitor.wait(timeout);
    +				monitor.wait();
    +			}
    +			return slot;
    +		}
    +	}
    +
    +	public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException,
TimeoutException {
    +		checkArgument(timeout >= 0, "timeout may not be negative");
    +		checkNotNull(timeUnit, "timeUnit");
    +
    +		if (timeout == 0) {
    +			return waitTillCompleted();
    +		} else {
    +			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
    +			long millisToWait;
    +
    +			synchronized (monitor) {
    +				while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000)
> 0) {
    +					monitor.wait(millisToWait);
    +				}
    +
    +				if (slot != null) {
    +					return slot;
    +				} else {
    +					throw new TimeoutException();
    +				}
     			}
    -			
    +		}
    +	}
    +
    +	/**
    +	 * Gets the slot from this future. This method throws an exception, if the future has
not been completed.
    +	 * This method never blocks.
    +	 * 
    +	 * @return The slot with which this future was completed.
    +	 * @throws IllegalStateException Thrown, if this method is called before the future
is completed.
    +	 */
    +	public SimpleSlot get() {
    +		final SimpleSlot slot = this.slot;
    +		if (slot != null) {
     			return slot;
    +		} else {
    +			throw new IllegalStateException("The future is not complete - not slot available");
    --- End diff --
    
    We can do this here. In the long run, we should probably replace this future by a generic
future, through. In that case, it would be back to a generic exception.
    
    It may be better to have the code that calls the future to get the slot catch the exception
and re-throw a `SlotNotReadyException`. That would also work when we use a generic future
later.


> Decouple Slot and Instance
> --------------------------
>
>                 Key: FLINK-4490
>                 URL: https://issues.apache.org/jira/browse/FLINK-4490
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>            Reporter: Kurt Young
>             Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} holding {{Slot}},
it makes sense because it reflects how many resources it can provide and how many are using.

> But it's not very necessary for {{Slot}} to hold {{Instance}} which it belongs to. It
only needs to hold some connection information and gateway to talk to. Another downside for
{{Slot}} holding {{Instance}} is that {{Instance}} actually contains some allocate/de-allocation
logicals, it will be difficult if we want to do some allocation refactor without letting {{Slot}}
noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} holds.
(Actually we have {{InstanceConnectionInfo}} now, but lacks of instance's akka gateway, maybe
we can just adding the akka gateway to the {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message