flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
Date Thu, 23 Nov 2017 16:02:15 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264539#comment-16264539
] 

ASF GitHub Bot commented on FLINK-8125:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5059#discussion_r152826230
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
    @@ -0,0 +1,1097 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.function.SupplierWithException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.HashSet;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A file system that limits the number of concurrently open input streams,
    + * output streams, and total streams for a target file system.
    + *
    + * <p>This file system can wrap another existing file system in cases where
    + * the target file system cannot handle certain connection spikes and connections
    + * would fail in that case. This happens, for example, for very small HDFS clusters
    + * with few RPC handlers, when a large Flink job tries to build up many connections during
    + * a checkpoint.
    + *
    + * <p>The filesystem may track the progress of streams and close streams that have
been
    + * inactive for too long, to avoid locked streams of taking up the complete pool.
    + * Rather than having a dedicated reaper thread, the calls that try to open a new stream
    + * periodically check the currently open streams once the limit of open streams is reached.
    + */
    +@Internal
    +public class LimitedConnectionsFileSystem extends FileSystem {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    +
    +	/** The original file system to which connections are limited. */
    +	private final FileSystem originalFs;
    +
    +	/** The lock that synchronizes connection bookkeeping. */
    +	private final ReentrantLock lock;
    +
    +	/** Condition for threads that are blocking on the availability of new connections.
*/
    +	private final Condition available;
    +
    +	/** The maximum number of concurrently open output streams. */
    +	private final int maxNumOpenOutputStreams;
    +
    +	/** The maximum number of concurrently open input streams. */
    +	private final int maxNumOpenInputStreams;
    +
    +	/** The maximum number of concurrently open streams (input + output). */
    +	private final int maxNumOpenStreamsTotal;
    +
    +	/** The nanoseconds that a opening a stream may wait for availability. */
    +	private final long streamOpenTimeoutNanos;
    +
    +	/** The nanoseconds that a stream may spend not writing any bytes before it is closed
as inactive. */
    +	private final long streamInactivityTimeoutNanos;
    +
    +	/** The set of currently open output streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<OutStream> openOutputStreams;
    +
    +	/** The set of currently open input streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<InStream> openInputStreams;
    +
    +	/** The number of output streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedOutputStreams;
    +
    +	/** The number of input streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedInputStreams;
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 */
    +	public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)
{
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system, limiting input and output streams
with
    +	 * potentially different quotas.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams
(0 means no limit).
    +	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams
(0 means no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			int maxNumOpenOutputStreams,
    +			int maxNumOpenInputStreams,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +
    +		checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >=
0");
    +		checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >=
0");
    +		checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >=
0");
    +		checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0
means infinite timeout)");
    +		checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >=
0 (0 means infinite timeout)");
    +
    +		this.originalFs = checkNotNull(originalFs, "originalFs");
    +		this.lock = new ReentrantLock(true);
    +		this.available = lock.newCondition();
    +		this.openOutputStreams = new HashSet<>();
    +		this.openInputStreams = new HashSet<>();
    +		this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
    +		this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
    +		this.maxNumOpenInputStreams = maxNumOpenInputStreams;
    +
    +		// assign nanos overflow aware
    +		final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
    +		final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
    +
    +		this.streamOpenTimeoutNanos =
    +				openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
    +
    +		this.streamInactivityTimeoutNanos =
    +				inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the maximum number of concurrently open output streams.
    +	 */
    +	public int getMaxNumOpenOutputStreams() {
    +		return maxNumOpenOutputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open input streams.
    +	 */
    +	public int getMaxNumOpenInputStreams() {
    +		return maxNumOpenInputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open streams (input + output).
    +	 */
    +	public int getMaxNumOpenStreamsTotal() {
    +		return maxNumOpenStreamsTotal;
    +	}
    +
    +	/**
    +	 * Gets the number of milliseconds that a opening a stream may wait for availability
in the
    +	 * connection pool.
    +	 */
    +	public long getStreamOpenTimeout() {
    +		return streamOpenTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the milliseconds that a stream may spend not writing any bytes before it is
closed as inactive.
    +	 */
    +	public long getStreamInactivityTimeout() {
    +		return streamInactivityTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the total number of open streams (input plus output).
    +	 */
    +	public int getTotalNumberOfOpenStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams + numReservedInputStreams;
    +		} finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open output streams.
    +	 */
    +	public int getNumberOfOpenOutputStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams;
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open input streams.
    +	 */
    +	public int getNumberOfOpenInputStreams() {
    +		return numReservedInputStreams;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  input & output stream opening methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException
{
    +		return createOutputStream(() -> originalFs.create(f, overwriteMode));
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public FSDataOutputStream create(
    +			Path f,
    +			boolean overwrite,
    +			int bufferSize,
    +			short replication,
    +			long blockSize) throws IOException {
    +
    +		return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication,
blockSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    +		return createInputStream(() -> originalFs.open(f, bufferSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f) throws IOException {
    +		return createInputStream(() -> originalFs.open(f));
    +	}
    +
    +	private FSDataOutputStream createOutputStream(
    +			final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
    +				() -> new OutStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openOutputStreams, true);
    +	}
    +
    +	private FSDataInputStream createInputStream(
    +			final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<InStream, IOException> wrappedStreamOpener =
    +				() -> new InStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openInputStreams, false);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  other delegating file system methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FileSystemKind getKind() {
    +		return originalFs.getKind();
    +	}
    +
    +	@Override
    +	public boolean isDistributedFS() {
    +		return originalFs.isDistributedFS();
    +	}
    +
    +	@Override
    +	public Path getWorkingDirectory() {
    +		return originalFs.getWorkingDirectory();
    +	}
    +
    +	@Override
    +	public Path getHomeDirectory() {
    +		return originalFs.getHomeDirectory();
    +	}
    +
    +	@Override
    +	public URI getUri() {
    +		return originalFs.getUri();
    +	}
    +
    +	@Override
    +	public FileStatus getFileStatus(Path f) throws IOException {
    +		return originalFs.getFileStatus(f);
    +	}
    +
    +	@Override
    +	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
    +		return originalFs.getFileBlockLocations(file, start, len);
    +	}
    +
    +	@Override
    +	public FileStatus[] listStatus(Path f) throws IOException {
    +		return originalFs.listStatus(f);
    +	}
    +
    +	@Override
    +	public boolean delete(Path f, boolean recursive) throws IOException {
    +		return originalFs.delete(f, recursive);
    +	}
    +
    +	@Override
    +	public boolean mkdirs(Path f) throws IOException {
    +		return originalFs.mkdirs(f);
    +	}
    +
    +	@Override
    +	public boolean rename(Path src, Path dst) throws IOException {
    +		return originalFs.rename(src, dst);
    +	}
    +
    +	@Override
    +	public boolean exists(Path f) throws IOException {
    +		return originalFs.exists(f);
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public long getDefaultBlockSize() {
    +		return originalFs.getDefaultBlockSize();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	private <T extends StreamWithTimeout> T createStream(
    +			final SupplierWithException<T, IOException> streamOpener,
    +			final HashSet<T> openStreams,
    +			final boolean output) throws IOException {
    +
    +		final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams
: Integer.MAX_VALUE;
    +		final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams
: Integer.MAX_VALUE;
    +		final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
    +		final int outputCredit = output ? 1 : 0;
    +		final int inputCredit = output ? 0 : 1;
    +
    +		// because waiting for availability may take long, we need to be interruptible here
    +		// and handle interrupted exceptions as I/O errors
    +		// even though the code is written to make sure the lock is held for a short time only,
    +		// making the lock acquisition interruptible helps to guard against the cases where
    +		// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
    +		try {
    +			lock.lockInterruptibly();
    +			try {
    +				// some integrity checks
    +				assert openOutputStreams.size() <= numReservedOutputStreams;
    +				assert openInputStreams.size() <= numReservedInputStreams;
    +
    +				// wait until there are few enough streams so we can open another
    +				waitForAvailability(totalLimit, outputLimit, inputLimit);
    +
    +				// We do not open the stream here in the locked scope because opening a stream
    +				// could take a while. Holding the lock during that operation would block all concurrent
    +				// attempts to try and open a stream, effectively serializing all calls to open the
streams.
    +				numReservedOutputStreams += outputCredit;
    +				numReservedInputStreams += inputCredit;
    +			}
    +			finally {
    +				lock.unlock();
    +			}
    +		}
    +		catch (InterruptedException e) {
    +			// restore interruption flag
    +			Thread.currentThread().interrupt();
    +			throw new IOException("interrupted before opening stream");
    +		}
    +
    +		// open the stream outside the lock.
    +		boolean success = false;
    +		try {
    +			final T out = streamOpener.get();
    +
    +			// add the stream to the set, need to re-acquire the lock
    +			lock.lock();
    +			try {
    +				openStreams.add(out);
    +			} finally {
    +				lock.unlock();
    +			}
    +
    +			// good, can now return cleanly
    +			success = true;
    +			return out;
    +		}
    +		finally {
    +			if (!success) {
    +				// remove the reserved credit
    +				// we must open this non-interruptibly, because this must succeed!
    +				lock.lock();
    +				try {
    +					numReservedOutputStreams -= outputCredit;
    +					numReservedInputStreams -= inputCredit;
    +					available.signalAll();
    +				} finally {
    +					lock.unlock();
    +				}
    +			}
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private void waitForAvailability(
    +			int totalLimit,
    +			int outputLimit,
    +			int inputLimit) throws InterruptedException, IOException {
    +
    +		checkState(lock.isHeldByCurrentThread());
    +
    +		// compute the deadline of this operations
    +		final long deadline;
    +		if (streamOpenTimeoutNanos == 0) {
    +			deadline = Long.MAX_VALUE;
    +		} else {
    +			long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
    +			// check for overflow
    +			deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
    +		}
    +
    +		// wait for available connections
    +		long timeLeft;
    +
    +		if (streamInactivityTimeoutNanos == 0) {
    +			// simple case: just wait
    +			while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				available.await(timeLeft, TimeUnit.NANOSECONDS);
    +			}
    +		}
    +		else {
    +			// complex case: chase down inactive streams
    +			final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
    +
    +			long now;
    +			while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while
still within timeout
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				// check all streams whether there in one that has been inactive for too long
    +				if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams,
now))) {
    +					// only wait if we did not manage to close any stream.
    +					// otherwise eagerly check again if we have availability now (we should have!)
    +					long timeToWait = Math.min(checkIntervalNanos, timeLeft);
    +					available.await(timeToWait, TimeUnit.NANOSECONDS);
    +				}
    +			}
    +		}
    +
    +		// check for timeout
    +		// we check availability again to catch cases where the timeout expired while waiting
    +		// to re-acquire the lock
    +		if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit))
{
    +			throw new IOException(String.format(
    +					"Timeout while waiting for an available stream/connect. " +
    +					"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d
ms",
    +					maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
    +					numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
    +		return numReservedOutputStreams < outputLimit &&
    +				numReservedInputStreams < inputLimit &&
    +				numReservedOutputStreams + numReservedInputStreams < totalLimit;
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams,
long nowNanos) {
    +		for (StreamWithTimeout stream : streams) {
    +			try {
    +				// If the stream is closed already, it will be removed anyways, so we
    +				// do not classify it as inactive. We also skip the check if another check happened
too recently.
    +				if (stream.isClosed() || nowNanos < stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos)
{
    +					// interval since last check not yet over
    +					return false;
    +				}
    +				else if (!stream.checkNewBytesAndMark(nowNanos)) {
    +					stream.closeDueToTimeout();
    +					return true;
    +				}
    +			}
    +			catch (StreamTimeoutException ignored) {
    +				// may happen due to races
    +			}
    +			catch (IOException e) {
    +				// only log on debug level here, to avoid log spamming
    +				LOG.debug("Could not check for stream progress to determine inactivity", e);
    --- End diff --
    
    I wonder what to do with a stream that throws exceptions on `getPos` and avoids check
for inactivity but also is not closed. This looks like a candidate that is in a bad state.
While it will probably sooner or later also cause exceptions somewhere else, I think it could
theoretically dodge closing forever and block attempts to create new streams. Might be appropriate
to close such a stream proactively?


> Support limiting the number of open FileSystem connections
> ----------------------------------------------------------
>
>                 Key: FLINK-8125
>                 URL: https://issues.apache.org/jira/browse/FLINK-8125
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large Flink job trying
to build up many connections during a checkpoint causes failures due to rejected connections.

> I propose to add a file system that can wrap another existing file system The file system
may track the progress of streams and close streams that have been inactive for too long,
to avoid locked streams of taking up the complete pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message