flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
Date Tue, 21 Nov 2017 14:50:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260826#comment-16260826
] 

ASF GitHub Bot commented on FLINK-7406:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152290197
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
    @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception {
     	}
     
     	/**
    -	 * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive
segment is
    -	 * recycled to available buffers directly and it triggers notify of announced credit.
    +	 * Tests to verify that the input channel requests floating buffers from buffer pool
    +	 * in order to maintain backlog + initialCredit buffers available once receiving the
    +	 * sender's backlog, and registers as listener if no floating buffers available.
     	 */
     	@Test
    -	public void testRecycleExclusiveBufferBeforeReleased() throws Exception {
    -		final SingleInputGate inputGate = mock(SingleInputGate.class);
    -		final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate));
    +	public void testRequestFloatingBufferOnSenderBacklog() throws Exception {
    +		// Setup
    +		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP);
    +		final SingleInputGate inputGate = createSingleInputGate();
    +		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
    +		try {
    +			final int numFloatingBuffers = 10;
    +			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers,
numFloatingBuffers));
    +			inputGate.setBufferPool(bufferPool);
    +
    +			// Assign exclusive segments to the channel
    +			final int numExclusiveBuffers = 2;
    +			inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
    +			inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers);
    +
    +			assertEquals("There should be " + numExclusiveBuffers + " buffers available in the
channel",
    +				numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers());
     
    -		// Recycle exclusive segment
    -		inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel));
    +			// Receive the producer's backlog
    +			inputChannel.onSenderBacklog(8);
     
    -		assertEquals("There should be one buffer available after recycle.",
    -			1, inputChannel.getNumberOfAvailableBuffers());
    -		verify(inputChannel, times(1)).notifyCreditAvailable();
    +			// Request the number of floating buffers by the formula of backlog + initialCredit
- availableBuffers
    +			verify(bufferPool, times(8)).requestBuffer();
    +			verify(bufferPool, times(0)).addBufferListener(inputChannel);
    +			assertEquals("There should be 10 buffers available in the channel",
    +				10, inputChannel.getNumberOfAvailableBuffers());
     
    -		inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel));
    +			inputChannel.onSenderBacklog(11);
     
    -		assertEquals("There should be two buffers available after recycle.",
    -			2, inputChannel.getNumberOfAvailableBuffers());
    -		// It should be called only once when increased from zero.
    -		verify(inputChannel, times(1)).notifyCreditAvailable();
    +			// Need extra three floating buffers, but only two buffers available in buffer pool,
register as listener as a result
    +			verify(bufferPool, times(11)).requestBuffer();
    +			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    +			assertEquals("There should be 12 buffers available in the channel",
    +				12, inputChannel.getNumberOfAvailableBuffers());
    +
    +			inputChannel.onSenderBacklog(12);
    +
    +			// Already in the status of waiting for buffers and will not request any more
    +			verify(bufferPool, times(11)).requestBuffer();
    +			verify(bufferPool, times(1)).addBufferListener(inputChannel);
    +
    --- End diff --
    
    - Also, could you add further checks that verify that we stick to `senderBacklog + initialCredit`
buffers after releasing (some of the) floating buffers, e.g. by getting them via `RemoteInputChannel#requestBuffer`
and then `recycle()` them?
      - What happens when the sender backlog decreases? (that should work and release buffers
accordingly)
      - Are extra floating buffers released correctly when we return exclusive buffers and
have too many buffers?
    - You could also try to release the exclusive buffers the same way and ensure that they
are recycled back into the channel and that we always maintain `initialCredit` buffers.


> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7406
>                 URL: https://issues.apache.org/jira/browse/FLINK-7406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads {{BufferResponse}} from
producer. It will request buffer from {{BufferPool}} for holding the message. If not got,
the message is staged temporarily and {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get buffer from {{RemoteInputChannel}}
for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for {{RemoteInputChannel}}, and it
may trigger requests of floating buffers from {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message