flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Date Thu, 14 Sep 2017 13:47:39 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138879343
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
    @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
     
     			this.numTotalRequiredBuffers += numRequiredBuffers;
     
    -			final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    -			for (int i = 0 ; i < numRequiredBuffers ; i++) {
    -				segments.add(availableMemorySegments.poll());
    -			}
    +			redistributeBuffers();
    +		}
     
    +		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    +		for (int i = 0 ; i < numRequiredBuffers ; i++) {
     			try {
    -				redistributeBuffers();
    -			} catch (IOException e) {
    -				if (segments.size() > 0) {
    -					recycleMemorySegments(segments);
    -				}
    -
    +				segments.add(availableMemorySegments.take());
    --- End diff --
    
    I know, I was the one who suggested it, but thinking about the blocking `take()` a bit
more and with some more background I acquired over the last weeks, I'm getting the feeling,
we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some
reason) we are waiting forever, we may at least be stopped by the `destroy()` function being
called. Or what do you think? I'm thinking about something like this:
    ```
    		final ArrayList<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    		try {
    			while (segments.size() < numRequiredBuffers) {
    				if (isDestroyed) {
    					throw new IllegalStateException("Buffer pool is destroyed.");
    				}
    
    				final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS);
    				if (segment != null) {
    					segments.add(segment);
    				}
    			}
    		} catch (Throwable e) {
    			recycleMemorySegments(segments);
    			ExceptionUtils.rethrowIOException(e);
    		}
    ```
    (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)
    
    The following test (for `NetworkBufferPoolTest`) could verify this behaviour:
    ```
    	@Rule
    	public ExpectedException expectedException = ExpectedException.none();
    
    	/**
    	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted
in
    	 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
    	 */
    	@Test
    	public void testRequestMemorySegmentsInterruptable() throws Exception {
    		final int numBuffers = 10;
    
    		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    		MemorySegment segment = globalPool.requestMemorySegment();
    		assertNotNull(segment);
    
    		final OneShotLatch isRunning = new OneShotLatch();
    		CheckedThread asyncRequest = new CheckedThread() {
    			@Override
    			public void go() throws Exception {
    				isRunning.trigger();
    				globalPool.requestMemorySegments(10);
    			}
    		};
    		asyncRequest.start();
    
    		// We want the destroy call inside the blocking part of the globalPool.requestMemorySegments()
    		// call above. We cannot guarantee this though but make it highly probable:
    		isRunning.await();
    		Thread.sleep(10);
    		globalPool.destroy();
    
    		segment.free();
    
    		expectedException.expect(IllegalStateException.class);
    		expectedException.expectMessage("destroyed");
    		asyncRequest.sync();
    	}
    ```


---

Mime
View raw message