flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowojski <...@git.apache.org>
Subject [GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Date Thu, 23 Nov 2017 16:18:51 GMT
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4581#discussion_r152826839
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
---
    @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume()
throws Exception
     		assertTrue(buffer.isRecycled());
     	}
     
    +	/**
    +	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition.
    +	 */
    +	@Test
    +	public void testAddOnFinishedSpillablePartition() throws Exception {
    +		SpillableSubpartition partition = createSubpartition();
    +		partition.finish();
    +
    +		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
    +			FreeingBufferRecycler.INSTANCE);
    +		try {
    +			partition.add(buffer);
    +		} finally {
    +			if (!buffer.isRecycled()) {
    +				Assert.fail("buffer not recycled");
    +				buffer.recycle();
    +			}
    +			// finish adds an EndOfPartitionEvent
    +			assertEquals(1, partition.getTotalNumberOfBuffers());
    +			assertEquals(4, partition.getTotalNumberOfBytes());
    +		}
    +	}
    +
    +	/**
    +	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition.
    +	 */
    +	@Test
    +	public void testAddOnFinishedSpilledPartition() throws Exception {
    +		SpillableSubpartition partition = createSubpartition();
    +		assertEquals(0, partition.releaseMemory());
    +		partition.finish();
    +
    +		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
    +			FreeingBufferRecycler.INSTANCE);
    +		try {
    +			partition.add(buffer);
    +		} finally {
    +			if (!buffer.isRecycled()) {
    +				Assert.fail("buffer not recycled");
    +				buffer.recycle();
    +			}
    +			// finish adds an EndOfPartitionEvent
    +			assertEquals(1, partition.getTotalNumberOfBuffers());
    +			assertEquals(4, partition.getTotalNumberOfBytes());
    +		}
    +	}
    +
    +	/**
    +	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition.
    +	 */
    +	@Test
    +	public void testAddOnReleasedSpillablePartition() throws Exception {
    +		SpillableSubpartition partition = createSubpartition();
    +		partition.release();
    +
    +		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
    +			FreeingBufferRecycler.INSTANCE);
    +		try {
    +			partition.add(buffer);
    +		} finally {
    --- End diff --
    
    ditto (and below)


---

Mime
View raw message