flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Date Fri, 24 Nov 2017 13:44:58 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152969860
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
    @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception
{
     			// Prepare the exclusive and floating buffers to verify recycle logic later
     			Buffer exclusiveBuffer = inputChannel.requestBuffer();
     			assertNotNull(exclusiveBuffer);
    -			Buffer floatingBuffer1 = bufferPool.requestBuffer();
    -			assertNotNull(floatingBuffer1);
    -			Buffer floatingBuffer2 = bufferPool.requestBuffer();
    -			assertNotNull(floatingBuffer2);
    +
    +			final int numRecycleFloatingBuffers = 4;
    +			final ArrayDeque<Buffer> floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers);
    +			for (int i = 0; i < numRecycleFloatingBuffers; i++) {
    +				Buffer floatingBuffer = bufferPool.requestBuffer();
    +				assertNotNull(floatingBuffer);
    +				floatingBufferQueue.add(floatingBuffer);
    +			}
     
     			// Receive the producer's backlog less than the number of available floating buffers
     			inputChannel.onSenderBacklog(8);
     
    -			// Request the floating buffers to maintain (backlog + initialCredit) available buffers
    -			verify(bufferPool, times(11)).requestBuffer();
    +			// Request the floating buffers to maintain (backlog + initialCredit) available buffers.
    +			// One exclusive buffer is taken before, so we should request 13 floating buffers.
    +			verify(bufferPool, times(13)).requestBuffer();
     			verify(bufferPool, times(0)).addBufferListener(inputChannel);
    -			assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers());
    -			assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 10 buffers available in the channel",
    +				10, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 10 buffers required in the channel",
    +				10, inputChannel.getNumberOfRequiredBuffers());
     
     			// Increase the backlog to exceed the number of available floating buffers
     			inputChannel.onSenderBacklog(10);
     
     			// The channel does not get enough floating buffer and register as buffer listener
    -			verify(bufferPool, times(13)).requestBuffer();
    +			verify(bufferPool, times(15)).requestBuffer();
     			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    -			assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    -			assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers());
    -			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +			assertEquals("There should be 11 buffers available in the channel",
    +				11, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 12 buffers required in the channel",
    +				12, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool",
    +				0, bufferPool.getNumberOfAvailableMemorySegments());
     
     			// Continue increasing the backlog
    -			inputChannel.onSenderBacklog(11);
    +			inputChannel.onSenderBacklog(12);
     
     			// The channel is already in the status of waiting for buffers and will not request
any more
    -			verify(bufferPool, times(13)).requestBuffer();
    +			verify(bufferPool, times(15)).requestBuffer();
     			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    -			assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    -			assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    -			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +			assertEquals("There should be 11 buffers available in the channel",
    +				11, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 14 buffers required in the channel",
    +				14, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool",
    +				0, bufferPool.getNumberOfAvailableMemorySegments());
     
    -			// Recycle the floating buffer and assign it to the buffer listener
    -			floatingBuffer1.recycle();
    +			// Recycle one floating buffer
    +			floatingBufferQueue.poll().recycle();
     
    -			// The channel is still waiting for one more floating buffer
    -			assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers());
    -			assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    -			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +			// Assign the floating buffer to the listener and the channel is still waiting for
more floating buffers
    +			assertEquals("There should be 12 buffers available in the channel",
    --- End diff --
    
    we should also add
    ```
    			verify(bufferPool, times(15)).requestBuffer();
    			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    ```
    here and into all blocks of checks below to ensure that the behaviour is as expected


---

Mime
View raw message