flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Date Fri, 24 Nov 2017 13:44:58 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152972414
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
    @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
     			initialAndMaxRequestBackoff._2(),
     			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
     	}
    +
    +	private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments)
{
    +		final List<Buffer> exclusiveBuffers = new ArrayList<>(numExclusiveSegments);
    +		// Exhaust all the exclusive buffers
    +		for (int i = 0; i < numExclusiveSegments; i++) {
    +			Buffer buffer = inputChannel.requestBuffer();
    +			assertNotNull(buffer);
    +			exclusiveBuffers.add(buffer);
    +		}
    +
    +		return new Callable<Void>() {
    +			@Override
    +			public Void call() throws Exception {
    +				for (Buffer buffer : exclusiveBuffers) {
    +					buffer.recycle();
    +				}
    +
    +				return null;
    +			}
    +		};
    +	}
    +
    +	private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers)
throws Exception {
    +		final List<Buffer> floatingBuffers = new ArrayList<>(numFloatingBuffers);
    +		// Exhaust all the floating buffers
    +		for (int i = 0; i < numFloatingBuffers; i++) {
    +			Buffer buffer = bufferPool.requestBuffer();
    +			assertNotNull(buffer);
    +			floatingBuffers.add(buffer);
    +		}
    +
    +		return new Callable<Void>() {
    +			@Override
    +			public Void call() throws Exception {
    +				for (Buffer buffer : floatingBuffers) {
    +					buffer.recycle();
    +				}
    +
    +				return null;
    +			}
    +		};
    +	}
    +
    +	private void submitTasksAndWaitResults(ExecutorService executor, Callable[] tasks) throws
Exception {
    --- End diff --
    
    maybe also rename to `submitTasksAndWaitForResults`


---

Mime
View raw message