flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
Date Tue, 28 Nov 2017 13:47:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268738#comment-16268738
] 

ASF GitHub Bot commented on FLINK-7873:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5074#discussion_r153495140
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
    +import org.apache.flink.runtime.checkpoint.CheckpointCache;
    +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes
data to both remote end (e.g:DFS) and local end.
    + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache}
and {@link CheckpointStreamFactory} and
    + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory},
this hybrid output stream will write
    + * to both remote end and local end.
    + */
    +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory {
    +
    +	private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
    +
    +	private final CheckpointCache cache;
    +	private final CheckpointStreamFactory remoteFactory;
    +
    +	public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory
factory) {
    +		this.cache = cache;
    +		this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is
null.");
    +	}
    +
    +	public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
long timestamp, StateHandleID handleID) throws Exception {
    +		return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false);
    +	}
    +
    +	public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
long timestamp, StateHandleID handleID, boolean placeholder) throws Exception {
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder);
    +		}
    +		CachedOutputStream cachedOut = null;
    +		if (cache != null) {
    +			cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder);
    +		}
    +		CheckpointStateOutputStream remoteOut = null;
    +		if (!placeholder) {
    +			remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
    +		}
    +		CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut,
remoteOut);
    +		return output;
    +	}
    +
    +	@Override
    +	public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
long timestamp) throws Exception {
    +		LOG.warn("create output stream which is not cacheable.");
    +		return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		remoteFactory.close();
    +	}
    +
    +	/**
    +	 * A hybrid checkpoint output stream which write data to both remote end and local end,
    +	 * writing data locally failed won't stop writing to remote. This hybrid output stream
    +	 * will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used
for read data locally.
    +	 */
    +	public static class CachedCheckpointStateOutputStream extends CheckpointStateOutputStream
{
    +
    +		private CachedOutputStream cacheOut = null;
    +		private CheckpointStateOutputStream remoteOut = null;
    +
    +		public CachedCheckpointStateOutputStream(CachedOutputStream cacheOut, CheckpointStateOutputStream
remoteOut) {
    +			this.cacheOut = cacheOut;
    +			this.remoteOut = remoteOut;
    +		}
    +
    +		@Override
    +		public StreamStateHandle closeAndGetHandle() throws IOException {
    +			if (cacheOut != null) {
    +				// finalize cache data
    +				StateHandleID cacheId = cacheOut.getCacheID();
    +				cacheOut.end();
    +
    +				StreamStateHandle remoteHandle;
    +				if (remoteOut != null) {
    +					remoteHandle = remoteOut.closeAndGetHandle();
    +				} else {
    +					remoteHandle = new PlaceholderStreamStateHandle(cacheId);
    +				}
    +				return new CachedStreamStateHandle(cacheId, remoteHandle);
    +			} else {
    +				if (remoteOut != null) {
    +					return remoteOut.closeAndGetHandle();
    +				} else {
    +					return null;
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public long getPos() throws IOException {
    +			return remoteOut != null ? remoteOut.getPos() :-1L;
    +		}
    +
    +		@Override
    +		public void write(int b) throws IOException {
    +			// write to local
    +			if (cacheOut != null) {
    +				try {
    +					cacheOut.write(b);
    +				} catch (Exception e) {
    +					//discard
    +					cacheOut.discard();
    +				}
    +			}
    +
    +			// write to remote
    +			if (remoteOut != null) {
    +				remoteOut.write(b);
    +			}
    +		}
    +
    +		@Override
    +		public void write(byte[] b, int off, int len) throws IOException {
    +			// write to local
    +			if (cacheOut != null) {
    +				try {
    +					cacheOut.write(b, off, len);
    +				} catch (Exception e) {
    +					//discard
    +					cacheOut.discard();
    +				}
    +			}
    +
    +			// write to remote
    +			if (remoteOut != null) {
    +				remoteOut.write(b, off, len);
    +			}
    +		}
    +
    +		@Override
    +		public void flush() throws IOException {
    +			if (cacheOut != null) {
    +				cacheOut.flush();
    --- End diff --
    
    You're right, What My intend is what you described but I missed it here, I should handle
it like above code, and wrapped up the catchOut operation with try catch block.


> Introduce CheckpointCacheManager for reading checkpoint data locally when performing
failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> Why i introduce this:
>     Current recover strategy will always read checkpoint data from remote FileStream
(HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse,
if this job performs recover again and again, it can eat up all network bandwidth and do a
huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read
checkpoint data from local cache as well as we can, we read the data from remote only if we
fail locally. The advantage is that if a execution is assigned to the same TaskManager as
before, it can save a lot of bandwith, and obtain a faster recover.
> Solution:
>     TaskManager do the cache job and manage the cached data itself. It simple use a TTL-like
method to manage cache entry's dispose, we dispose a entry if it wasn't be touched for a X
time, once we touch a entry we reset the TTL for it. In this way, all jobs is done by TaskManager,
it transparent to JobManager. The only problem is that we may dispose a entry that maybe useful,
in this case, we have to read from remote data finally, but users can avoid this by set a
proper TTL value according to checkpoint interval and other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message