flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Date Mon, 15 Jan 2018 17:35:00 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161559926
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer
is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = spy(new PartitionRequestQueue());
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId,
0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
    +		// The reader is enqueued in the pipeline because the next buffer is event, even though
no available credits
    +		verify(queue, times(1)).enqueueAvailableReader(reader);
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available
credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(),
2));
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    --- End diff --
    
    let's remove that mock ...


---

Mime
View raw message