flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Date Mon, 15 Jan 2018 17:34:40 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161567331
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
---
    @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer
is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId,
0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		// The reader is enqueued in the pipeline because the next buffer is an event, even
though no credits are available
    +		assertEquals(1, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available
credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(),
2, false));
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId,
0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +		queue.notifyReaderCreated(reader);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify available buffers to trigger enqueue the reader
    +		final int notifyNumBuffers = 5;
    +		for (int i = 0; i < notifyNumBuffers; i++) {
    +			reader.notifyBuffersAvailable(1);
    +		}
    +
    +		channel.runPendingTasks();
    +
    +		// the reader is not enqueued in the pipeline because no credits are available
    +		// -> it should still have the same number of pending buffers
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
    +		assertFalse(reader.isRegisteredAsAvailable());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Notify available credits to trigger enqueue the reader again
    +		final int notifyNumCredits = 3;
    +		for (int i = 1; i <= notifyNumCredits; i++) {
    +			queue.addCredit(receiverId, 1);
    +
    +			// the reader is enqueued in the pipeline because it has both available buffers and
credits
    +			// since the channel is blocked though, we will not process anything and only enqueue
the
    +			// reader once
    +			assertTrue(reader.isRegisteredAsAvailable());
    +			assertEquals(1, queue.getAvailableReaders().size());
    +			assertEquals(i, reader.getNumCreditsAvailable());
    +			assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
    +		}
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +		assertEquals(notifyNumBuffers - notifyNumCredits, reader.getNumBuffersAvailable());
    +		assertFalse(reader.isRegisteredAsAvailable());
    --- End diff --
    
    let's end with `assertNull(channel.readOutbound());`


---

Mime
View raw message