flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
Date Fri, 24 Nov 2017 13:45:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265294#comment-16265294
] 

ASF GitHub Bot commented on FLINK-7406:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152972034
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
    @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception {
     	}
     
     	/**
    -	 * Tests to verify that the input channel requests floating buffers from buffer pool
    -	 * in order to maintain backlog + initialCredit buffers available once receiving the
    -	 * sender's backlog, and registers as listener if no floating buffers available.
    +	 * Tests to verify that the input channel requests floating buffers from buffer pool
for
    +	 * maintaining (backlog + initialCredit) available buffers once receiving the sender's
backlog.
    +	 *
    +	 * <p>Verifies the logic of recycling floating buffer back into the input channel
and the logic
    +	 * of returning extra floating buffer into the buffer pool during recycling exclusive
buffer.
     	 */
     	@Test
    -	public void testRequestFloatingBufferOnSenderBacklog() throws Exception {
    +	public void testRequestAndReturnFloatingBuffer() throws Exception {
     		// Setup
    -		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP);
    +		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP);
    +		final int numExclusiveBuffers = 2;
    +		final int numFloatingBuffers = 12;
    +
     		final SingleInputGate inputGate = createSingleInputGate();
     		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
    +		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
     		try {
    -			final int numFloatingBuffers = 10;
     			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers,
numFloatingBuffers));
     			inputGate.setBufferPool(bufferPool);
    -
    -			// Assign exclusive segments to the channel
    -			final int numExclusiveBuffers = 2;
    -			inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
     			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
     
    -			assertEquals("There should be " + numExclusiveBuffers + " buffers available in the
channel",
    -				numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers());
    +			// Prepare the exclusive and floating buffers to verify recycle logic later
    +			Buffer exclusiveBuffer = inputChannel.requestBuffer();
    +			assertNotNull(exclusiveBuffer);
    +			Buffer floatingBuffer1 = bufferPool.requestBuffer();
    +			assertNotNull(floatingBuffer1);
    +			Buffer floatingBuffer2 = bufferPool.requestBuffer();
    +			assertNotNull(floatingBuffer2);
     
    -			// Receive the producer's backlog
    +			// Receive the producer's backlog less than the number of available floating buffers
     			inputChannel.onSenderBacklog(8);
     
    -			// Request the number of floating buffers by the formula of backlog + initialCredit
- availableBuffers
    -			verify(bufferPool, times(8)).requestBuffer();
    +			// Request the floating buffers to maintain (backlog + initialCredit) available buffers
    +			verify(bufferPool, times(11)).requestBuffer();
     			verify(bufferPool, times(0)).addBufferListener(inputChannel);
    -			assertEquals("There should be 10 buffers available in the channel",
    -				10, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers());
     
    -			inputChannel.onSenderBacklog(11);
    +			// Increase the backlog to exceed the number of available floating buffers
    +			inputChannel.onSenderBacklog(10);
     
    -			// Need extra three floating buffers, but only two buffers available in buffer pool,
register as listener as a result
    -			verify(bufferPool, times(11)).requestBuffer();
    +			// The channel does not get enough floating buffer and register as buffer listener
    +			verify(bufferPool, times(13)).requestBuffer();
     			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    -			assertEquals("There should be 12 buffers available in the channel",
    -				12, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
     
    -			inputChannel.onSenderBacklog(12);
    +			// Continue increasing the backlog
    +			inputChannel.onSenderBacklog(11);
     
    -			// Already in the status of waiting for buffers and will not request any more
    -			verify(bufferPool, times(11)).requestBuffer();
    +			// The channel is already in the status of waiting for buffers and will not request
any more
    +			verify(bufferPool, times(13)).requestBuffer();
     			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    +			assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +
    +			// Recycle the floating buffer and assign it to the buffer listener
    +			floatingBuffer1.recycle();
    +
    +			// The channel is still waiting for one more floating buffer
    +			assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +
    +			// Recycle one more floating buffer again
    +			floatingBuffer2.recycle();
    +
    +			// The channel already gets all the required buffers
    +			assertEquals("There should be 13 buffers available in the channel", 13, inputChannel.getNumberOfAvailableBuffers());
    +			assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    +			assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +
    --- End diff --
    
    Great. I'm unsure, however, whether you tested all three cases now, i.e. getting from
a state of `getNumberOfRequiredBuffers() > getNumberOfAvailableBuffers()` to a state of
equality via all three means. Your last tests all come from `getNumberOfRequiredBuffers()
<= getNumberOfAvailableBuffers()` only for which you check the behaviour. Also, `testRequestAndReturnFloatingBuffer()`
is getting quite long and complex now.
    
    Should we have those as (three?) separate tests instead (and cover these two origins one
after another there, starting with `getNumberOfRequiredBuffers() > getNumberOfAvailableBuffers()`
first, getting to `getNumberOfRequiredBuffers() = getNumberOfAvailableBuffers()` via one of
the three causes, and getting to `getNumberOfRequiredBuffers() <= getNumberOfAvailableBuffers()`
via one of them, too)


> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7406
>                 URL: https://issues.apache.org/jira/browse/FLINK-7406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads {{BufferResponse}} from
producer. It will request buffer from {{BufferPool}} for holding the message. If not got,
the message is staged temporarily and {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get buffer from {{RemoteInputChannel}}
for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for {{RemoteInputChannel}}, and it
may trigger requests of floating buffers from {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message