flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
Date Thu, 23 Nov 2017 16:02:14 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264537#comment-16264537
] 

ASF GitHub Bot commented on FLINK-8125:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5059#discussion_r152823934
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
    @@ -0,0 +1,1097 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.function.SupplierWithException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.HashSet;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A file system that limits the number of concurrently open input streams,
    + * output streams, and total streams for a target file system.
    + *
    + * <p>This file system can wrap another existing file system in cases where
    + * the target file system cannot handle certain connection spikes and connections
    + * would fail in that case. This happens, for example, for very small HDFS clusters
    + * with few RPC handlers, when a large Flink job tries to build up many connections during
    + * a checkpoint.
    + *
    + * <p>The filesystem may track the progress of streams and close streams that have
been
    + * inactive for too long, to avoid locked streams of taking up the complete pool.
    + * Rather than having a dedicated reaper thread, the calls that try to open a new stream
    + * periodically check the currently open streams once the limit of open streams is reached.
    + */
    +@Internal
    +public class LimitedConnectionsFileSystem extends FileSystem {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    +
    +	/** The original file system to which connections are limited. */
    +	private final FileSystem originalFs;
    +
    +	/** The lock that synchronizes connection bookkeeping. */
    +	private final ReentrantLock lock;
    +
    +	/** Condition for threads that are blocking on the availability of new connections.
*/
    +	private final Condition available;
    +
    +	/** The maximum number of concurrently open output streams. */
    +	private final int maxNumOpenOutputStreams;
    +
    +	/** The maximum number of concurrently open input streams. */
    +	private final int maxNumOpenInputStreams;
    +
    +	/** The maximum number of concurrently open streams (input + output). */
    +	private final int maxNumOpenStreamsTotal;
    +
    +	/** The nanoseconds that a opening a stream may wait for availability. */
    +	private final long streamOpenTimeoutNanos;
    +
    +	/** The nanoseconds that a stream may spend not writing any bytes before it is closed
as inactive. */
    +	private final long streamInactivityTimeoutNanos;
    +
    +	/** The set of currently open output streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<OutStream> openOutputStreams;
    +
    +	/** The set of currently open input streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<InStream> openInputStreams;
    +
    +	/** The number of output streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedOutputStreams;
    +
    +	/** The number of input streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedInputStreams;
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 */
    +	public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)
{
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system, limiting input and output streams
with
    +	 * potentially different quotas.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams
(0 means no limit).
    +	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams
(0 means no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			int maxNumOpenOutputStreams,
    +			int maxNumOpenInputStreams,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +
    +		checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >=
0");
    +		checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >=
0");
    +		checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >=
0");
    +		checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0
means infinite timeout)");
    +		checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >=
0 (0 means infinite timeout)");
    +
    +		this.originalFs = checkNotNull(originalFs, "originalFs");
    +		this.lock = new ReentrantLock(true);
    +		this.available = lock.newCondition();
    +		this.openOutputStreams = new HashSet<>();
    +		this.openInputStreams = new HashSet<>();
    +		this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
    +		this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
    +		this.maxNumOpenInputStreams = maxNumOpenInputStreams;
    +
    +		// assign nanos overflow aware
    +		final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
    +		final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
    +
    +		this.streamOpenTimeoutNanos =
    +				openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
    +
    +		this.streamInactivityTimeoutNanos =
    +				inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the maximum number of concurrently open output streams.
    +	 */
    +	public int getMaxNumOpenOutputStreams() {
    +		return maxNumOpenOutputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open input streams.
    +	 */
    +	public int getMaxNumOpenInputStreams() {
    +		return maxNumOpenInputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open streams (input + output).
    +	 */
    +	public int getMaxNumOpenStreamsTotal() {
    +		return maxNumOpenStreamsTotal;
    +	}
    +
    +	/**
    +	 * Gets the number of milliseconds that a opening a stream may wait for availability
in the
    +	 * connection pool.
    +	 */
    +	public long getStreamOpenTimeout() {
    +		return streamOpenTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the milliseconds that a stream may spend not writing any bytes before it is
closed as inactive.
    +	 */
    +	public long getStreamInactivityTimeout() {
    +		return streamInactivityTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the total number of open streams (input plus output).
    +	 */
    +	public int getTotalNumberOfOpenStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams + numReservedInputStreams;
    +		} finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open output streams.
    +	 */
    +	public int getNumberOfOpenOutputStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams;
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open input streams.
    +	 */
    +	public int getNumberOfOpenInputStreams() {
    +		return numReservedInputStreams;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  input & output stream opening methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException
{
    +		return createOutputStream(() -> originalFs.create(f, overwriteMode));
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public FSDataOutputStream create(
    +			Path f,
    +			boolean overwrite,
    +			int bufferSize,
    +			short replication,
    +			long blockSize) throws IOException {
    +
    +		return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication,
blockSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    +		return createInputStream(() -> originalFs.open(f, bufferSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f) throws IOException {
    +		return createInputStream(() -> originalFs.open(f));
    +	}
    +
    +	private FSDataOutputStream createOutputStream(
    +			final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
    +				() -> new OutStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openOutputStreams, true);
    +	}
    +
    +	private FSDataInputStream createInputStream(
    +			final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<InStream, IOException> wrappedStreamOpener =
    +				() -> new InStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openInputStreams, false);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  other delegating file system methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FileSystemKind getKind() {
    +		return originalFs.getKind();
    +	}
    +
    +	@Override
    +	public boolean isDistributedFS() {
    +		return originalFs.isDistributedFS();
    +	}
    +
    +	@Override
    +	public Path getWorkingDirectory() {
    +		return originalFs.getWorkingDirectory();
    +	}
    +
    +	@Override
    +	public Path getHomeDirectory() {
    +		return originalFs.getHomeDirectory();
    +	}
    +
    +	@Override
    +	public URI getUri() {
    +		return originalFs.getUri();
    +	}
    +
    +	@Override
    +	public FileStatus getFileStatus(Path f) throws IOException {
    +		return originalFs.getFileStatus(f);
    +	}
    +
    +	@Override
    +	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
    +		return originalFs.getFileBlockLocations(file, start, len);
    +	}
    +
    +	@Override
    +	public FileStatus[] listStatus(Path f) throws IOException {
    +		return originalFs.listStatus(f);
    +	}
    +
    +	@Override
    +	public boolean delete(Path f, boolean recursive) throws IOException {
    +		return originalFs.delete(f, recursive);
    +	}
    +
    +	@Override
    +	public boolean mkdirs(Path f) throws IOException {
    +		return originalFs.mkdirs(f);
    +	}
    +
    +	@Override
    +	public boolean rename(Path src, Path dst) throws IOException {
    +		return originalFs.rename(src, dst);
    +	}
    +
    +	@Override
    +	public boolean exists(Path f) throws IOException {
    +		return originalFs.exists(f);
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public long getDefaultBlockSize() {
    +		return originalFs.getDefaultBlockSize();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	private <T extends StreamWithTimeout> T createStream(
    +			final SupplierWithException<T, IOException> streamOpener,
    +			final HashSet<T> openStreams,
    +			final boolean output) throws IOException {
    +
    +		final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams
: Integer.MAX_VALUE;
    +		final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams
: Integer.MAX_VALUE;
    +		final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
    +		final int outputCredit = output ? 1 : 0;
    +		final int inputCredit = output ? 0 : 1;
    +
    +		// because waiting for availability may take long, we need to be interruptible here
    +		// and handle interrupted exceptions as I/O errors
    +		// even though the code is written to make sure the lock is held for a short time only,
    +		// making the lock acquisition interruptible helps to guard against the cases where
    +		// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
    +		try {
    +			lock.lockInterruptibly();
    +			try {
    +				// some integrity checks
    +				assert openOutputStreams.size() <= numReservedOutputStreams;
    +				assert openInputStreams.size() <= numReservedInputStreams;
    +
    +				// wait until there are few enough streams so we can open another
    +				waitForAvailability(totalLimit, outputLimit, inputLimit);
    +
    +				// We do not open the stream here in the locked scope because opening a stream
    +				// could take a while. Holding the lock during that operation would block all concurrent
    +				// attempts to try and open a stream, effectively serializing all calls to open the
streams.
    +				numReservedOutputStreams += outputCredit;
    +				numReservedInputStreams += inputCredit;
    +			}
    +			finally {
    +				lock.unlock();
    +			}
    +		}
    +		catch (InterruptedException e) {
    +			// restore interruption flag
    +			Thread.currentThread().interrupt();
    +			throw new IOException("interrupted before opening stream");
    +		}
    +
    +		// open the stream outside the lock.
    +		boolean success = false;
    +		try {
    +			final T out = streamOpener.get();
    +
    +			// add the stream to the set, need to re-acquire the lock
    +			lock.lock();
    +			try {
    +				openStreams.add(out);
    +			} finally {
    +				lock.unlock();
    +			}
    +
    +			// good, can now return cleanly
    +			success = true;
    +			return out;
    +		}
    +		finally {
    +			if (!success) {
    +				// remove the reserved credit
    +				// we must open this non-interruptibly, because this must succeed!
    +				lock.lock();
    +				try {
    +					numReservedOutputStreams -= outputCredit;
    +					numReservedInputStreams -= inputCredit;
    +					available.signalAll();
    +				} finally {
    +					lock.unlock();
    +				}
    +			}
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private void waitForAvailability(
    +			int totalLimit,
    +			int outputLimit,
    +			int inputLimit) throws InterruptedException, IOException {
    +
    +		checkState(lock.isHeldByCurrentThread());
    +
    +		// compute the deadline of this operations
    +		final long deadline;
    +		if (streamOpenTimeoutNanos == 0) {
    +			deadline = Long.MAX_VALUE;
    +		} else {
    +			long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
    +			// check for overflow
    +			deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
    +		}
    +
    +		// wait for available connections
    +		long timeLeft;
    +
    +		if (streamInactivityTimeoutNanos == 0) {
    +			// simple case: just wait
    +			while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				available.await(timeLeft, TimeUnit.NANOSECONDS);
    +			}
    +		}
    +		else {
    +			// complex case: chase down inactive streams
    +			final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
    +
    +			long now;
    +			while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while
still within timeout
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				// check all streams whether there in one that has been inactive for too long
    +				if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams,
now))) {
    +					// only wait if we did not manage to close any stream.
    +					// otherwise eagerly check again if we have availability now (we should have!)
    +					long timeToWait = Math.min(checkIntervalNanos, timeLeft);
    +					available.await(timeToWait, TimeUnit.NANOSECONDS);
    +				}
    +			}
    +		}
    +
    +		// check for timeout
    +		// we check availability again to catch cases where the timeout expired while waiting
    +		// to re-acquire the lock
    +		if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit))
{
    +			throw new IOException(String.format(
    +					"Timeout while waiting for an available stream/connect. " +
    +					"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d
ms",
    +					maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
    +					numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
    +		return numReservedOutputStreams < outputLimit &&
    +				numReservedInputStreams < inputLimit &&
    +				numReservedOutputStreams + numReservedInputStreams < totalLimit;
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams,
long nowNanos) {
    +		for (StreamWithTimeout stream : streams) {
    +			try {
    +				// If the stream is closed already, it will be removed anyways, so we
    +				// do not classify it as inactive. We also skip the check if another check happened
too recently.
    +				if (stream.isClosed() || nowNanos < stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos)
{
    +					// interval since last check not yet over
    +					return false;
    +				}
    +				else if (!stream.checkNewBytesAndMark(nowNanos)) {
    +					stream.closeDueToTimeout();
    +					return true;
    +				}
    +			}
    +			catch (StreamTimeoutException ignored) {
    +				// may happen due to races
    +			}
    +			catch (IOException e) {
    +				// only log on debug level here, to avoid log spamming
    +				LOG.debug("Could not check for stream progress to determine inactivity", e);
    +			}
    +		}
    +
    +		return false;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Atomically removes the given output stream from the set of currently open output
streams,
    +	 * and signals that new stream can now be opened.
    +	 */
    +	void unregisterOutputStream(OutStream stream) {
    +		lock.lock();
    +		try {
    +			// only decrement if we actually remove the stream
    +			if (openOutputStreams.remove(stream)) {
    +				numReservedOutputStreams--;
    +				available.signalAll();
    +			}
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Atomically removes the given input stream from the set of currently open input streams,
    +	 * and signals that new stream can now be opened.
    +	 */
    +	void unregisterInputStream(InStream stream) {
    +		lock.lock();
    +		try {
    +			// only decrement if we actually remove the stream
    +			if (openInputStreams.remove(stream)) {
    +				numReservedInputStreams--;
    +				available.signalAll();
    +			}
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A special IOException, indicating a timeout in the data output stream.
    +	 */
    +	public static final class StreamTimeoutException extends IOException {
    +
    +		private static final long serialVersionUID = -8790922066795901928L;
    +
    +		public StreamTimeoutException() {
    +			super("Stream closed due to inactivity timeout. " +
    +					"This is done to prevent inactive streams from blocking the full " +
    +					"pool of limited connections");
    +		}
    +
    +		public StreamTimeoutException(StreamTimeoutException other) {
    +			super(other.getMessage(), other);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Interface for streams that can be checked for inactivity.
    +	 */
    +	private interface StreamWithTimeout extends Closeable {
    +
    +		/**
    +		 * Gets the timestamp when the last inactivity evaluation was made.
    +		 */
    +		long getLastCheckTimestampNanos();
    +
    +		/**
    +		 * Checks whether there were new bytes since the last time this method was invoked.
    +		 * This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
    +		 *
    +		 * @return True, if there were new bytes, false if not.
    +		 */
    +		boolean checkNewBytesAndMark(long timestamp) throws IOException;
    +
    +		/**
    +		 * Closes the stream asynchronously with a special exception that indicates closing
due
    +		 * to lack of progress.
    +		 */
    +		void closeDueToTimeout() throws IOException;
    +
    +		/**
    +		 * Checks whether the stream was closed already.
    +		 */
    +		boolean isClosed();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A data output stream that wraps a given data output stream and un-registers
    +	 * from a given connection-limiting file system
    +	 * (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
    +	 * upon closing.
    +	 */
    +	private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout
{
    +
    +		/** The original data output stream to write to. */
    +		private final FSDataOutputStream originalStream;
    +
    +		/** The connection-limiting file system to un-register from. */
    +		private final LimitedConnectionsFileSystem fs;
    +
    +		/** An exception with which the stream has been externally closed. */
    +		private volatile StreamTimeoutException timeoutException;
    +
    +		/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
    +		 * method was called. It is important to initialize this with {@code -1} so that the
    +		 * first check (0 bytes) always appears to have made progress. */
    +		private volatile long lastCheckBytes = -1;
    +
    +		/** The timestamp when the last inactivity evaluation was made. */
    +		private volatile long lastCheckTimestampNanos;
    +
    +		/** Flag tracking whether the stream was already closed, for proper inactivity tracking.
*/
    +		private AtomicBoolean closed = new AtomicBoolean();
    +
    +		OutStream(
    +				FSDataOutputStream originalStream,
    +				LimitedConnectionsFileSystem fs) {
    +
    +			this.originalStream = checkNotNull(originalStream);
    +			this.fs = checkNotNull(fs);
    +		}
    +
    +		// --- FSDataOutputStream API implementation
    +
    +		@Override
    +		public void write(int b) throws IOException {
    +			try {
    +				originalStream.write(b);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void write(byte[] b, int off, int len) throws IOException {
    +			try {
    +				originalStream.write(b, off, len);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public long getPos() throws IOException {
    +			try {
    +				return originalStream.getPos();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return -1; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public void flush() throws IOException {
    +			try {
    +				originalStream.flush();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void sync() throws IOException {
    +			try {
    +				originalStream.sync();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +			if (closed.compareAndSet(false, true)) {
    +				try {
    +					originalStream.close();
    +				}
    +				catch (IOException e) {
    +					handleIOException(e);
    +				}
    +				finally {
    +					fs.unregisterOutputStream(this);
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void closeDueToTimeout() throws IOException {
    +			this.timeoutException = new StreamTimeoutException();
    +			close();
    +		}
    +
    +		@Override
    +		public boolean isClosed() {
    +			return closed.get();
    +		}
    +
    +		@Override
    +		public long getLastCheckTimestampNanos() {
    +			return lastCheckTimestampNanos;
    +		}
    +
    +		@Override
    +		public boolean checkNewBytesAndMark(long timestamp) throws IOException {
    +			// remember the time when checked
    +			lastCheckTimestampNanos = timestamp;
    +
    +			final long bytesNow = originalStream.getPos();
    +			if (bytesNow > lastCheckBytes) {
    +				lastCheckBytes = bytesNow;
    +				return true;
    +			}
    +			else {
    +				return false;
    +			}
    +		}
    +
    +		private void handleIOException(IOException exception) throws IOException {
    +			if (timeoutException == null) {
    +				throw exception;
    +			} else {
    +				// throw a new exception to capture this call's stack trace
    +				throw new StreamTimeoutException(timeoutException);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * A data input stream that wraps a given data input stream and un-registers
    +	 * from a given connection-limiting file system
    +	 * (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
    +	 * upon closing.
    +	 */
    +	private static final class InStream extends FSDataInputStream implements StreamWithTimeout
{
    +
    +		/** The original data input stream to read from. */
    +		private final FSDataInputStream originalStream;
    +
    +		/** The connection-limiting file system to un-register from. */
    +		private final LimitedConnectionsFileSystem fs;
    +
    +		/** An exception with which the stream has been externally closed. */
    +		private volatile StreamTimeoutException timeoutException;
    +
    +		/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
    +		 * method was called. It is important to initialize this with {@code -1} so that the
    +		 * first check (0 bytes) always appears to have made progress. */
    +		private volatile long lastCheckBytes = -1;
    +
    +		/** The timestamp when the last inactivity evaluation was made. */
    +		private volatile long lastCheckTimestampNanos;
    +
    +		/** Flag tracking whether the stream was already closed, for proper inactivity tracking.
*/
    +		private AtomicBoolean closed = new AtomicBoolean();
    +
    +		InStream(
    +				FSDataInputStream originalStream,
    +				LimitedConnectionsFileSystem fs) {
    +
    +			this.originalStream = checkNotNull(originalStream);
    +			this.fs = checkNotNull(fs);
    +		}
    +
    +		// --- FSDataOutputStream API implementation
    +
    +		@Override
    +		public int read() throws IOException {
    +			try {
    +				return originalStream.read();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int read(byte[] b) throws IOException {
    +			try {
    +				return originalStream.read(b);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int read(byte[] b, int off, int len) throws IOException {
    +			try {
    +				return originalStream.read(b, off, len);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public long skip(long n) throws IOException {
    +			try {
    +				return originalStream.skip(n);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0L; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int available() throws IOException {
    +			try {
    +				return originalStream.available();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public void mark(int readlimit) {
    +			originalStream.mark(readlimit);
    +		}
    +
    +		@Override
    +		public void reset() throws IOException {
    +			originalStream.reset();
    --- End diff --
    
    Is there a special reason why this is not using `handleIOException(...)` like other methods?


> Support limiting the number of open FileSystem connections
> ----------------------------------------------------------
>
>                 Key: FLINK-8125
>                 URL: https://issues.apache.org/jira/browse/FLINK-8125
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large Flink job trying
to build up many connections during a checkpoint causes failures due to rejected connections.

> I propose to add a file system that can wrap another existing file system The file system
may track the progress of streams and close streams that have been inactive for too long,
to avoid locked streams of taking up the complete pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message