flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Date Fri, 01 Dec 2017 13:13:36 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r154321304
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
    @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception {
     	}
     
     	/**
    -	 * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and
    -	 * {@link AddCredit} message is sent to the producer.
    +	 * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying
credits,
    +	 * and verifies the behaviour of credit notification by triggering channel's writability
changed.
     	 */
     	@Test
     	public void testNotifyCreditAvailable() throws Exception {
    +		final SingleInputGate inputGate = createSingleInputGate();
    +		final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate));
    +		final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate));
     		final CreditBasedClientHandler handler = new CreditBasedClientHandler();
    -		final EmbeddedChannel channel = new EmbeddedChannel(handler);
    +		final EmbeddedChannel channel = spy(new EmbeddedChannel(handler));
     
    -		final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class));
    +		// Increase the credits to enqueue the input channels
    +		inputChannel1.increaseCredit(1);
    +		inputChannel2.increaseCredit(1);
    +		handler.notifyCreditAvailable(inputChannel1);
    +		handler.notifyCreditAvailable(inputChannel2);
     
    -		// Enqueue the input channel
    -		handler.notifyCreditAvailable(inputChannel);
    +		channel.runPendingTasks();
    +
    +		// The two input channels should notify credits via writable channel
    +		assertTrue(channel.isWritable());
    +		assertEquals(channel.readOutbound().getClass(), AddCredit.class);
    +		verify(inputChannel1, times(1)).getAndResetCredit();
    +		verify(inputChannel2, times(1)).getAndResetCredit();
    +
    +		final int highWaterMark = channel.config().getWriteBufferHighWaterMark();
    +		// Set the writer index to the high water mark to ensure that all bytes are written
    +		// to the wire although the buffer is "empty".
    +		channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
    +
    +		// Enqueue the input channel on the condition of un-writable channel
    +		inputChannel1.increaseCredit(1);
    +		handler.notifyCreditAvailable(inputChannel1);
     
     		channel.runPendingTasks();
     
    -		// Read the enqueued msg
    -		Object msg1 = channel.readOutbound();
    +		// The input channel will not notify credits via un-writable channel
    +		assertFalse(channel.isWritable());
    +		verify(inputChannel1, times(1)).getAndResetCredit();
    --- End diff --
    
    same as above


---

Mime
View raw message