flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Date Thu, 23 Nov 2017 16:02:22 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5059#discussion_r152826230
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
    @@ -0,0 +1,1097 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.function.SupplierWithException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.HashSet;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A file system that limits the number of concurrently open input streams,
    + * output streams, and total streams for a target file system.
    + *
    + * <p>This file system can wrap another existing file system in cases where
    + * the target file system cannot handle certain connection spikes and connections
    + * would fail in that case. This happens, for example, for very small HDFS clusters
    + * with few RPC handlers, when a large Flink job tries to build up many connections during
    + * a checkpoint.
    + *
    + * <p>The filesystem may track the progress of streams and close streams that have
been
    + * inactive for too long, to avoid locked streams of taking up the complete pool.
    + * Rather than having a dedicated reaper thread, the calls that try to open a new stream
    + * periodically check the currently open streams once the limit of open streams is reached.
    + */
    +@Internal
    +public class LimitedConnectionsFileSystem extends FileSystem {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    +
    +	/** The original file system to which connections are limited. */
    +	private final FileSystem originalFs;
    +
    +	/** The lock that synchronizes connection bookkeeping. */
    +	private final ReentrantLock lock;
    +
    +	/** Condition for threads that are blocking on the availability of new connections.
*/
    +	private final Condition available;
    +
    +	/** The maximum number of concurrently open output streams. */
    +	private final int maxNumOpenOutputStreams;
    +
    +	/** The maximum number of concurrently open input streams. */
    +	private final int maxNumOpenInputStreams;
    +
    +	/** The maximum number of concurrently open streams (input + output). */
    +	private final int maxNumOpenStreamsTotal;
    +
    +	/** The nanoseconds that a opening a stream may wait for availability. */
    +	private final long streamOpenTimeoutNanos;
    +
    +	/** The nanoseconds that a stream may spend not writing any bytes before it is closed
as inactive. */
    +	private final long streamInactivityTimeoutNanos;
    +
    +	/** The set of currently open output streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<OutStream> openOutputStreams;
    +
    +	/** The set of currently open input streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<InStream> openInputStreams;
    +
    +	/** The number of output streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedOutputStreams;
    +
    +	/** The number of input streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedInputStreams;
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 */
    +	public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)
{
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system, limiting input and output streams
with
    +	 * potentially different quotas.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams
(0 means no limit).
    +	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams
(0 means no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			int maxNumOpenOutputStreams,
    +			int maxNumOpenInputStreams,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +
    +		checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >=
0");
    +		checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >=
0");
    +		checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >=
0");
    +		checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0
means infinite timeout)");
    +		checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >=
0 (0 means infinite timeout)");
    +
    +		this.originalFs = checkNotNull(originalFs, "originalFs");
    +		this.lock = new ReentrantLock(true);
    +		this.available = lock.newCondition();
    +		this.openOutputStreams = new HashSet<>();
    +		this.openInputStreams = new HashSet<>();
    +		this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
    +		this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
    +		this.maxNumOpenInputStreams = maxNumOpenInputStreams;
    +
    +		// assign nanos overflow aware
    +		final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
    +		final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
    +
    +		this.streamOpenTimeoutNanos =
    +				openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
    +
    +		this.streamInactivityTimeoutNanos =
    +				inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the maximum number of concurrently open output streams.
    +	 */
    +	public int getMaxNumOpenOutputStreams() {
    +		return maxNumOpenOutputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open input streams.
    +	 */
    +	public int getMaxNumOpenInputStreams() {
    +		return maxNumOpenInputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open streams (input + output).
    +	 */
    +	public int getMaxNumOpenStreamsTotal() {
    +		return maxNumOpenStreamsTotal;
    +	}
    +
    +	/**
    +	 * Gets the number of milliseconds that a opening a stream may wait for availability
in the
    +	 * connection pool.
    +	 */
    +	public long getStreamOpenTimeout() {
    +		return streamOpenTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the milliseconds that a stream may spend not writing any bytes before it is
closed as inactive.
    +	 */
    +	public long getStreamInactivityTimeout() {
    +		return streamInactivityTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the total number of open streams (input plus output).
    +	 */
    +	public int getTotalNumberOfOpenStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams + numReservedInputStreams;
    +		} finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open output streams.
    +	 */
    +	public int getNumberOfOpenOutputStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams;
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open input streams.
    +	 */
    +	public int getNumberOfOpenInputStreams() {
    +		return numReservedInputStreams;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  input & output stream opening methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException
{
    +		return createOutputStream(() -> originalFs.create(f, overwriteMode));
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public FSDataOutputStream create(
    +			Path f,
    +			boolean overwrite,
    +			int bufferSize,
    +			short replication,
    +			long blockSize) throws IOException {
    +
    +		return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication,
blockSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    +		return createInputStream(() -> originalFs.open(f, bufferSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f) throws IOException {
    +		return createInputStream(() -> originalFs.open(f));
    +	}
    +
    +	private FSDataOutputStream createOutputStream(
    +			final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
    +				() -> new OutStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openOutputStreams, true);
    +	}
    +
    +	private FSDataInputStream createInputStream(
    +			final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<InStream, IOException> wrappedStreamOpener =
    +				() -> new InStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openInputStreams, false);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  other delegating file system methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FileSystemKind getKind() {
    +		return originalFs.getKind();
    +	}
    +
    +	@Override
    +	public boolean isDistributedFS() {
    +		return originalFs.isDistributedFS();
    +	}
    +
    +	@Override
    +	public Path getWorkingDirectory() {
    +		return originalFs.getWorkingDirectory();
    +	}
    +
    +	@Override
    +	public Path getHomeDirectory() {
    +		return originalFs.getHomeDirectory();
    +	}
    +
    +	@Override
    +	public URI getUri() {
    +		return originalFs.getUri();
    +	}
    +
    +	@Override
    +	public FileStatus getFileStatus(Path f) throws IOException {
    +		return originalFs.getFileStatus(f);
    +	}
    +
    +	@Override
    +	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
    +		return originalFs.getFileBlockLocations(file, start, len);
    +	}
    +
    +	@Override
    +	public FileStatus[] listStatus(Path f) throws IOException {
    +		return originalFs.listStatus(f);
    +	}
    +
    +	@Override
    +	public boolean delete(Path f, boolean recursive) throws IOException {
    +		return originalFs.delete(f, recursive);
    +	}
    +
    +	@Override
    +	public boolean mkdirs(Path f) throws IOException {
    +		return originalFs.mkdirs(f);
    +	}
    +
    +	@Override
    +	public boolean rename(Path src, Path dst) throws IOException {
    +		return originalFs.rename(src, dst);
    +	}
    +
    +	@Override
    +	public boolean exists(Path f) throws IOException {
    +		return originalFs.exists(f);
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public long getDefaultBlockSize() {
    +		return originalFs.getDefaultBlockSize();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	private <T extends StreamWithTimeout> T createStream(
    +			final SupplierWithException<T, IOException> streamOpener,
    +			final HashSet<T> openStreams,
    +			final boolean output) throws IOException {
    +
    +		final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams
: Integer.MAX_VALUE;
    +		final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams
: Integer.MAX_VALUE;
    +		final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
    +		final int outputCredit = output ? 1 : 0;
    +		final int inputCredit = output ? 0 : 1;
    +
    +		// because waiting for availability may take long, we need to be interruptible here
    +		// and handle interrupted exceptions as I/O errors
    +		// even though the code is written to make sure the lock is held for a short time only,
    +		// making the lock acquisition interruptible helps to guard against the cases where
    +		// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
    +		try {
    +			lock.lockInterruptibly();
    +			try {
    +				// some integrity checks
    +				assert openOutputStreams.size() <= numReservedOutputStreams;
    +				assert openInputStreams.size() <= numReservedInputStreams;
    +
    +				// wait until there are few enough streams so we can open another
    +				waitForAvailability(totalLimit, outputLimit, inputLimit);
    +
    +				// We do not open the stream here in the locked scope because opening a stream
    +				// could take a while. Holding the lock during that operation would block all concurrent
    +				// attempts to try and open a stream, effectively serializing all calls to open the
streams.
    +				numReservedOutputStreams += outputCredit;
    +				numReservedInputStreams += inputCredit;
    +			}
    +			finally {
    +				lock.unlock();
    +			}
    +		}
    +		catch (InterruptedException e) {
    +			// restore interruption flag
    +			Thread.currentThread().interrupt();
    +			throw new IOException("interrupted before opening stream");
    +		}
    +
    +		// open the stream outside the lock.
    +		boolean success = false;
    +		try {
    +			final T out = streamOpener.get();
    +
    +			// add the stream to the set, need to re-acquire the lock
    +			lock.lock();
    +			try {
    +				openStreams.add(out);
    +			} finally {
    +				lock.unlock();
    +			}
    +
    +			// good, can now return cleanly
    +			success = true;
    +			return out;
    +		}
    +		finally {
    +			if (!success) {
    +				// remove the reserved credit
    +				// we must open this non-interruptibly, because this must succeed!
    +				lock.lock();
    +				try {
    +					numReservedOutputStreams -= outputCredit;
    +					numReservedInputStreams -= inputCredit;
    +					available.signalAll();
    +				} finally {
    +					lock.unlock();
    +				}
    +			}
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private void waitForAvailability(
    +			int totalLimit,
    +			int outputLimit,
    +			int inputLimit) throws InterruptedException, IOException {
    +
    +		checkState(lock.isHeldByCurrentThread());
    +
    +		// compute the deadline of this operations
    +		final long deadline;
    +		if (streamOpenTimeoutNanos == 0) {
    +			deadline = Long.MAX_VALUE;
    +		} else {
    +			long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
    +			// check for overflow
    +			deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
    +		}
    +
    +		// wait for available connections
    +		long timeLeft;
    +
    +		if (streamInactivityTimeoutNanos == 0) {
    +			// simple case: just wait
    +			while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				available.await(timeLeft, TimeUnit.NANOSECONDS);
    +			}
    +		}
    +		else {
    +			// complex case: chase down inactive streams
    +			final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
    +
    +			long now;
    +			while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while
still within timeout
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				// check all streams whether there in one that has been inactive for too long
    +				if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams,
now))) {
    +					// only wait if we did not manage to close any stream.
    +					// otherwise eagerly check again if we have availability now (we should have!)
    +					long timeToWait = Math.min(checkIntervalNanos, timeLeft);
    +					available.await(timeToWait, TimeUnit.NANOSECONDS);
    +				}
    +			}
    +		}
    +
    +		// check for timeout
    +		// we check availability again to catch cases where the timeout expired while waiting
    +		// to re-acquire the lock
    +		if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit))
{
    +			throw new IOException(String.format(
    +					"Timeout while waiting for an available stream/connect. " +
    +					"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d
ms",
    +					maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
    +					numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
    +		return numReservedOutputStreams < outputLimit &&
    +				numReservedInputStreams < inputLimit &&
    +				numReservedOutputStreams + numReservedInputStreams < totalLimit;
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams,
long nowNanos) {
    +		for (StreamWithTimeout stream : streams) {
    +			try {
    +				// If the stream is closed already, it will be removed anyways, so we
    +				// do not classify it as inactive. We also skip the check if another check happened
too recently.
    +				if (stream.isClosed() || nowNanos < stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos)
{
    +					// interval since last check not yet over
    +					return false;
    +				}
    +				else if (!stream.checkNewBytesAndMark(nowNanos)) {
    +					stream.closeDueToTimeout();
    +					return true;
    +				}
    +			}
    +			catch (StreamTimeoutException ignored) {
    +				// may happen due to races
    +			}
    +			catch (IOException e) {
    +				// only log on debug level here, to avoid log spamming
    +				LOG.debug("Could not check for stream progress to determine inactivity", e);
    --- End diff --
    
    I wonder what to do with a stream that throws exceptions on `getPos` and avoids check
for inactivity but also is not closed. This looks like a candidate that is in a bad state.
While it will probably sooner or later also cause exceptions somewhere else, I think it could
theoretically dodge closing forever and block attempts to create new streams. Might be appropriate
to close such a stream proactively?


---

Mime
View raw message