flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2808) Rework / Extend the StatehandleProvider
Date Thu, 08 Oct 2015 15:31:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948849#comment-14948849
] 

ASF GitHub Bot commented on FLINK-2808:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1239#discussion_r41528403
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
    @@ -322,73 +311,84 @@ public String getName() {
     		return getEnvironment().getTaskNameWithSubtasks();
     	}
     
    +	/**
    +	 * Gets the lock object on which all operations that involve data and state mutation
have to lock. 
    +	 
    +	 * @return The checkpoint lock object.
    +	 */
     	public Object getCheckpointLock() {
     		return lock;
     	}
    +	
    +	public StreamConfig getConfiguration() {
    +		return configuration;
    +	}
    +
    +	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
    +		return accumulatorMap;
    +	}
    +	
    +	public Output<StreamRecord<OUT>> getHeadOutput() {
    +		return operatorChain.getChainEntryPoint();
    +	}
    +	
    +	public RecordWriterOutput<?>[] getStreamOutputs() {
    +		return operatorChain.getStreamOutputs();
    +	}
     
     	// ------------------------------------------------------------------------
     	//  Checkpoint and Restore
     	// ------------------------------------------------------------------------
    -
    -	@SuppressWarnings("unchecked")
    +	
     	@Override
    -	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception
{
    -
    -		// We retrieve end restore the states for the chained operators.
    -		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>
chainedStates = 
    -				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>)
stateHandle.getState(this.userClassLoader);
    -
    -		// We restore all stateful operators
    -		for (int i = 0; i < chainedStates.size(); i++) {
    -			Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>
state = chainedStates.get(i);
    -			// If state is not null we need to restore it
    -			if (state != null) {
    -				StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i);
    -				((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state);
    +	public void setInitialState(StreamTaskStateList initialState) throws Exception {
    +		LOG.info("Restoring checkpointed state to task {}", getName());
    +		
    +		final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
    +		final StreamTaskState[] states = initialState.getState(userClassLoader);
    +		
    +		for (int i = 0; i < states.length; i++) {
    +			StreamTaskState state = states[i];
    +			StreamOperator<?> operator = allOperators[i];
    +			
    +			if (state != null && operator != null) {
    +				LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
    +				operator.restoreState(state);
    +			}
    +			else if (operator != null) {
    +				LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
     			}
     		}
     	}
     
     	@Override
     	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
    -
     		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
     		
     		synchronized (lock) {
     			if (isRunning) {
    -				try {
    -					// We wrap the states of the chained operators in a list, marking non-stateful operators
with null
    -					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>
chainedStates = new ArrayList<>();
     
    -					// A wrapper handle is created for the List of statehandles
    -					WrapperStateHandle stateHandle;
    -					try {
    -
    -						// We construct a list of states for chained tasks
    -						for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators())
{
    -							if (chainedOperator instanceof StatefulStreamOperator) {
    -								chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
    -										.getStateSnapshotFromFunction(checkpointId, timestamp));
    -							}else{
    -								chainedStates.add(null);
    -							}
    -						}
    -
    -						stateHandle = CollectionUtils.exists(chainedStates,
    -								NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
    -					}
    -					catch (Exception e) {
    -						throw new Exception("Error while drawing snapshot of the user state.", e);
    +				// since both state checkpointing and downstream barrier emission occurs in this
    +				// lock scope, they are an atomic operation regardless of the order in which they
occur
    +				// we immediately emit the checkpoint barriers, so the downstream operators can start
    +				// their checkpoint work as soon as possible
    +				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
    --- End diff --
    
    Yes, we wait for an ack from all tasks. If sinks ack earlier than intermediate tasks,
the checkpoint is still only complete once the intermediates acknowledge it.


> Rework / Extend the StatehandleProvider
> ---------------------------------------
>
>                 Key: FLINK-2808
>                 URL: https://issues.apache.org/jira/browse/FLINK-2808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.10
>
>
> I would like to make some changes (mostly additions) to the {{StateHandleProvider}}.
Ideally for the upcoming release, as it is somewhat part of the public API.
> The rational behind this is to handle in a nice and extensible way the creation of key/value
state backed by various implementations (FS, distributed KV store, local KV store with FS
backup, ...) and various checkpointing ways (full dump, append, incremental keys, ...)
> The changes would concretely be:
> 1.  There should be a default {{StateHandleProvider}} set on the execution environment.
Functions can later specify the {{StateHandleProvider}} when grabbing the {{StreamOperatorState}}
from the runtime context (plus optionally a {{Checkpointer}})
> 2.  The {{StreamOperatorState}} is created from the {{StateHandleProvider}}. That way,
a KeyValueStore state backend can create a {{StreamOperatorState}} that directly updates data
in the KV store on every access, if that is desired (and filter accesses by timestamps to
only show committed data)
> 3.  The StateHandleProvider should have methods to get an output stream that writes to
the state checkpoint directly (and returns a StateHandle upon closing). That way we can convert
and dump large state into the checkpoint without crating a full copy in memory before.
> Lastly, I would like to change some names
>   - {{StateHandleProvider}} to either {{StateBackend}}, {{StateStore}}, or {{StateProvider}}
(simpler name).
>   - {{StreamOperatorState}} to either {{State}} or {{KVState}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message