flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Date Tue, 21 Nov 2017 14:49:50 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152280350
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
    @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception {
     	}
     
     	/**
    -	 * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive
segment is
    -	 * recycled to available buffers directly and it triggers notify of announced credit.
    +	 * Tests to verify that the input channel requests floating buffers from buffer pool
    +	 * in order to maintain backlog + initialCredit buffers available once receiving the
    +	 * sender's backlog, and registers as listener if no floating buffers available.
     	 */
     	@Test
    -	public void testRecycleExclusiveBufferBeforeReleased() throws Exception {
    -		final SingleInputGate inputGate = mock(SingleInputGate.class);
    -		final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate));
    +	public void testRequestFloatingBufferOnSenderBacklog() throws Exception {
    +		// Setup
    +		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP);
    +		final SingleInputGate inputGate = createSingleInputGate();
    +		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
    +		try {
    +			final int numFloatingBuffers = 10;
    +			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers,
numFloatingBuffers));
    +			inputGate.setBufferPool(bufferPool);
    +
    +			// Assign exclusive segments to the channel
    +			final int numExclusiveBuffers = 2;
    +			inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
    +			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
    +
    +			assertEquals("There should be " + numExclusiveBuffers + " buffers available in the
channel",
    +				numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers());
     
    -		// Recycle exclusive segment
    -		inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel));
    +			// Receive the producer's backlog
    +			inputChannel.onSenderBacklog(8);
     
    -		assertEquals("There should be one buffer available after recycle.",
    -			1, inputChannel.getNumberOfAvailableBuffers());
    -		verify(inputChannel, times(1)).notifyCreditAvailable();
    +			// Request the number of floating buffers by the formula of backlog + initialCredit
- availableBuffers
    +			verify(bufferPool, times(8)).requestBuffer();
    +			verify(bufferPool, times(0)).addBufferListener(inputChannel);
    +			assertEquals("There should be 10 buffers available in the channel",
    +				10, inputChannel.getNumberOfAvailableBuffers());
     
    -		inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel));
    +			inputChannel.onSenderBacklog(11);
     
    -		assertEquals("There should be two buffers available after recycle.",
    -			2, inputChannel.getNumberOfAvailableBuffers());
    -		// It should be called only once when increased from zero.
    -		verify(inputChannel, times(1)).notifyCreditAvailable();
    +			// Need extra three floating buffers, but only two buffers available in buffer pool,
register as listener as a result
    +			verify(bufferPool, times(11)).requestBuffer();
    +			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    +			assertEquals("There should be 12 buffers available in the channel",
    +				12, inputChannel.getNumberOfAvailableBuffers());
    +
    +			inputChannel.onSenderBacklog(12);
    +
    +			// Already in the status of waiting for buffers and will not request any more
    +			verify(bufferPool, times(11)).requestBuffer();
    +			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    +
    --- End diff --
    
    Can you also verify the behaviour when the buffers become available? That is, create the
`bufferPool` with `numFloatingBuffers+1` buffers initially, take one of them right after the
creation, then continue as is and here `recycle()` this buffer so that the channel should
get it via the buffer listener.
    Actually, it might even be useful to not only wait for 1 missing buffer, but to change
the number so that is should receive 2 of them so that we test that the listener is actually
getting not only one.


---

Mime
View raw message