flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1320) Add an off-heap variant of the managed memory
Date Thu, 08 Jan 2015 11:36:34 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269227#comment-14269227
] 

ASF GitHub Bot commented on FLINK-1320:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/290#discussion_r22647023
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
---
    @@ -0,0 +1,560 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.memory;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.nio.BufferOverflowException;
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +
    +/**
    + * This class uses in parts code from Java's direct byte buffer API.
    + * 
    + * The use in this class two crucial additions:
    + *  - It uses collapsed checks for range check and memory segment disposal.
    + *  - It offers absolute positioning methods for byte array put/get methods, to guarantee
thread safe use.
    + *  
    + * In addition, the code that uses this class should make sure that only one implementation
class is ever loaded -
    + * Either the {@link HeapMemorySegment}, or this DirectMemorySegment. That way, all the
abstract methods in the
    + * MemorySegment base class have only one loaded actual implementation. This is easy
for the JIT to recognize through
    + * class hierarchy analysis, or by identifying that the invocations are monomorphic (all
go to the same concrete
    + * method implementation). Under this precondition, the JIT can perfectly inline methods.
    + * 
    + * This is harder to do and control with byte buffers, where different code paths use
different versions of the class
    + * (heap, direct, mapped) and thus virtual method invocations are polymorphic and are
not as easily inlined.
    + */
    +public class DirectMemorySegment extends MemorySegment {
    +	
    +	/** The direct byte buffer that allocated the memory */
    +	protected final ByteBuffer buffer;
    +	
    +	/** The address to the off-heap data */
    +	private long address;
    +	
    +	/** The address one byte after the last addressable byte.
    +	 *  This is address + size while the segment is not disposed */
    +	private final long addressLimit;
    +
    +	/** The size in bytes of the memory segment */
    +	private final int size;
    +	
    +	// -------------------------------------------------------------------------
    +	//                             Constructors
    +	// -------------------------------------------------------------------------
    +
    +	public DirectMemorySegment(int size) {
    +		this(ByteBuffer.allocateDirect(size));
    +	}
    +
    +	public DirectMemorySegment(ByteBuffer buffer) {
    +		if (buffer == null || !buffer.isDirect()) {
    +			throw new IllegalArgumentException();
    +		}
    +		
    +		this.buffer = buffer;
    +		this.size = buffer.capacity();
    +		this.address = getAddress(buffer);
    +		this.addressLimit = this.address + size;
    +		
    +		if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) {
    +			throw new RuntimeException("Segment initialized with too large address: " + address);
    +		}
    +	}
    +
    +	// -------------------------------------------------------------------------
    +	//                        MemorySegment Accessors
    +	// -------------------------------------------------------------------------
    +	
    +
    +	@Override
    +	public final boolean isFreed() {
    +		return this.address > this.addressLimit;
    +	}
    +
    +	public final void free() {
    +		// this ensures we can place no more data and trigger
    +		// the checks for the freed segment
    +		this.address = this.addressLimit + 1;
    +	}
    +	
    +	@Override
    +	public final int size() {
    +		return this.size;
    +	}
    +
    +	@Override
    +	public ByteBuffer wrap(int offset, int length) {
    +		if (offset < 0 || offset > this.size || offset > this.size - length) {
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		this.buffer.limit(offset + length);
    +		this.buffer.position(offset);
    +		
    +		return this.buffer;
    +	}
    +
    +
    +	// ------------------------------------------------------------------------
    +	//                    Random Access get() and put() methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final byte get(int index) {
    +		
    +		final long pos = address + index;
    +		if (index >= 0 && pos < addressLimit) {
    +			return UNSAFE.getByte(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void put(int index, byte b) {
    +		
    +		final long pos = address + index;
    +		if (index >= 0 && pos < addressLimit) {
    +			UNSAFE.putByte(pos, b);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	public final void get(int index, byte[] dst) {
    +		get(index, dst, 0, dst.length);
    +	}
    +
    +	@Override
    +	public final void put(int index, byte[] src) {
    +		put(index, src, 0, src.length);
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void get(int index, byte[] dst, int offset, int length) {
    +		
    +		// check the byte array offset and length
    +		if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		long pos = address + index;
    +		
    +		if (index >= 0 && pos <= addressLimit - length) {
    +			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
    +			
    +			// the copy must proceed in batches not too large, because the JVM may
    +			// poll for points that are safe for GC (moving the array and changing its address)
    +			while (length > 0) {
    +				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
    +				UNSAFE.copyMemory(null, pos, dst, arrayAddress, toCopy);
    +				length -= toCopy;
    +				pos += toCopy;
    +				arrayAddress += toCopy;
    +			}
    +		}
    +		else if (address <= 0) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void put(int index, byte[] src, int offset, int length) {
    +		// check the byte array offset and length
    +		if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - length) {
    +		
    +			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
    +			while (length > 0) {
    +				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
    +				UNSAFE.copyMemory(src, arrayAddress, null, pos, toCopy);
    +				length -= toCopy;
    +				pos += toCopy;
    +				arrayAddress += toCopy;
    +			}
    +		}
    +		else if (address <= 0) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	public final boolean getBoolean(int index) {
    +		return get(index) != 0;
    +	}
    +
    +	@Override
    +	public final void putBoolean(int index, boolean value) {
    +		put(index, (byte) (value ? 1 : 0));
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final char getChar(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			return UNSAFE.getChar(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putChar(int index, char value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			UNSAFE.putChar(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final short getShort(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			return UNSAFE.getShort(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putShort(int index, short value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			UNSAFE.putShort(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final int getInt(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 4) {
    +			return UNSAFE.getInt(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putInt(int index, int value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 4) {
    +			UNSAFE.putInt(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final long getLong(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 8) {
    +			return UNSAFE.getLong(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putLong(int index, long value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 8) {
    +			UNSAFE.putLong(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	// -------------------------------------------------------------------------
    +	//                     Bulk Read and Write Methods
    +	// -------------------------------------------------------------------------
    +	
    +	@Override
    +	public final void get(DataOutput out, int offset, int length) throws IOException {
    +		throw new UnsupportedOperationException("not implemented");
    +	}
    +
    +	@Override
    +	public final void put(DataInput in, int offset, int length) throws IOException {
    +		throw new UnsupportedOperationException("not implemented");
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void get(int offset, ByteBuffer target, int numBytes) {
    +		
    +		// check the byte array offset and length
    +		if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		final int targetOffset = target.position();
    +		final int remaining = target.remaining();
    +		
    +		if (remaining < numBytes) {
    +			throw new BufferOverflowException();
    +		}
    +		
    +		if (target.isDirect()) {
    +			// copy to the target memory directly
    +			final long targetPointer = getAddress(target) + targetOffset;
    +			final long sourcePointer = address + offset;
    +			
    +			if (sourcePointer <= addressLimit - numBytes) {
    +				UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
    +			}
    +			else if (address > addressLimit) {
    +				throw new IllegalStateException("disposed");
    +			}
    +			else {
    +				throw new IndexOutOfBoundsException();
    +			}
    +		}
    +		else if (target.hasArray()) {
    +			// move directly into the byte array
    +			get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
    +			
    +			// this must be after the get() call to ensue that the byte buffer is not
    +			// modified in case the call fails
    +			target.position(targetOffset + numBytes);
    +		}
    +		else {
    +			// neither heap buffer nor direct buffer
    +			while (target.hasRemaining()) {
    +				target.put(get(offset++));
    +			}
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void put(int offset, ByteBuffer source, int numBytes) {
    +		
    +		// check the byte array offset and length
    +		if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		final int sourceOffset = source.position();
    +		final int remaining = source.remaining();
    +		
    +		if (remaining < numBytes) {
    +			throw new BufferUnderflowException();
    +		}
    +		
    +		if (source.isDirect()) {
    +			// copy to the target memory directly
    +			final long sourcePointer = getAddress(source) + sourceOffset;
    +			final long targetPointer = address + offset;
    +			
    +			if (sourcePointer <= addressLimit - numBytes) {
    +				UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
    +			}
    +			else if (address > addressLimit) {
    +				throw new IllegalStateException("disposed");
    +			}
    +			else {
    +				throw new IndexOutOfBoundsException();
    +			}
    +		}
    +		else if (source.hasArray()) {
    +			// move directly into the byte array
    +			put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
    +			
    +			// this must be after the get() call to ensue that the byte buffer is not
    +			// modified in case the call fails
    +			source.position(sourceOffset + numBytes);
    +		}
    +		else {
    +			// neither heap buffer nor direct buffer
    +			while (source.hasRemaining()) {
    +				put(offset++, source.get());
    +			}
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes)
{
    +		if (target.getClass() == DirectMemorySegment.class) {
    +			DirectMemorySegment directOther = (DirectMemorySegment) target;
    +			
    +			final long thisPointer = address + offset;
    +			final long otherPointer = directOther.address + targetOffset;
    +			
    +			if (numBytes >= 0 && thisPointer <= addressLimit - numBytes &&
otherPointer <= directOther.addressLimit - numBytes) {
    +				UNSAFE.copyMemory(thisPointer, otherPointer, numBytes);
    +			}
    +			else if (address > addressLimit || directOther.address > directOther.addressLimit)
{
    +				throw new IllegalStateException("disposed");
    +			}
    +			else {
    +				throw new IndexOutOfBoundsException();
    +			}
    +		}
    +		else {
    +			throw new IllegalArgumentException("Can only copy to other direct memory segments");
    +		}
    +	}
    +
    +	// -------------------------------------------------------------------------
    +	//                      Comparisons & Swapping
    +	// -------------------------------------------------------------------------
    +
    +	@Override
    +	public int compare(MemorySegment seg2, int offset1, int offset2, int len) {
    +		ByteBuffer b1 = this.buffer;
    +		ByteBuffer b2 = ((DirectMemorySegment)seg2).buffer;
    +		return b1.compareTo(b2);
    +	}
    +
    +	@Override
    +	public void swapBytes(MemorySegment seg2, int offset1, int offset2, int len) {
    +		while(len >= 8) {
    +			long tmp = this.getLong(offset1);
    +			this.putLong(offset1, seg2.getLong(offset2));
    +			seg2.putLong(offset2, tmp);
    +			offset1+=8;
    +			offset2+=8;
    +			len-=8;
    +		}
    +		while(len > 0) {
    +			byte tmp = this.get(offset1);
    +			this.put(offset1, seg2.get(offset2));
    +			seg2.put(offset2, tmp);
    +			offset1++;
    +			offset2++;
    +			len--;
    +		}
    +
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//                     Utilities for native memory accesses and checks
    +	// --------------------------------------------------------------------------------------------
    +	
    +	@SuppressWarnings("restriction")
    +	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
    +	
    +	@SuppressWarnings("restriction")
    +	private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
    +	
    +	private static final long COPY_PER_BATCH = 1024 * 1024;
    +	
    +	private static final Field ADDRESS_FIELD;
    +	
    +	static {
    +		try {
    +			ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
    +			ADDRESS_FIELD.setAccessible(true);
    +		}
    +		catch (Throwable t) {
    +			throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory
not supported by Flink");
    --- End diff --
    
    I would suggest to add a hint to the error message suggesting to turn off the direct memory
allocation.
    
    (I actually find the last part of the message confusing, because it is actually not really
"not supported by Flink", but by the system?)


> Add an off-heap variant of the managed memory
> ---------------------------------------------
>
>                 Key: FLINK-1320
>                 URL: https://issues.apache.org/jira/browse/FLINK-1320
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>            Reporter: Stephan Ewen
>            Priority: Minor
>
> For (nearly) all memory that Flink accumulates (in the form of sort buffers, hash tables,
caching), we use a special way of representing data serialized across a set of memory pages.
The big work lies in the way the algorithms are implemented to operate on pages, rather than
on objects.
> The core class for the memory is the {{MemorySegment}}, which has all methods to set
and get primitives values efficiently. It is a somewhat simpler (and faster) variant of a
HeapByteBuffer.
> As such, it should be straightforward to create a version where the memory segment is
not backed by a heap byte[], but by memory allocated outside the JVM, in a similar way as
the NIO DirectByteBuffers, or the Netty direct buffers do it.
> This may have multiple advantages:
>   - We reduce the size of the JVM heap (garbage collected) and the number and size of
long living alive objects. For large JVM sizes, this may improve performance quite a bit.
Utilmately, we would in many cases reduce JVM size to 1/3 to 1/2 and keep the remaining memory
outside the JVM.
>   - We save copies when we move memory pages to disk (spilling) or through the network
(shuffling / broadcasting / forward piping)
> The changes required to implement this are
>   - Add a {{UnmanagedMemorySegment}} that only stores the memory adress as a long, and
the segment size. It is initialized from a DirectByteBuffer.
>   - Allow the MemoryManager to allocate these MemorySegments, instead of the current
ones.
>   - Make sure that the startup script pick up the mode and configure the heap size and
the max direct memory properly.
> Since the MemorySegment is probably the most performance critical class in Flink, we
must take care that we do this right. The following are critical considerations:
>   - If we want both solutions (heap and off-heap) to exist side-by-side (configurable),
we must make the base MemorySegment abstract and implement two versions (heap and off-heap).
>   - To get the best performance, we need to make sure that only one class gets loaded
(or at least ever used), to ensure optimal JIT de-virtualization and inlining.
>   - We should carefully measure the performance of both variants. From previous micro
benchmarks, I remember that individual byte accesses in DirectByteBuffers (off-heap) were
slightly slower than on-heap, any larger accesses were equally good or slightly better.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message