flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uce <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1320] Add an off-heap variant of the ma...
Date Thu, 08 Jan 2015 11:40:54 GMT
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/290#discussion_r22647181
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
---
    @@ -0,0 +1,560 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.memory;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.nio.BufferOverflowException;
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +
    +/**
    + * This class uses in parts code from Java's direct byte buffer API.
    + * 
    + * The use in this class two crucial additions:
    + *  - It uses collapsed checks for range check and memory segment disposal.
    + *  - It offers absolute positioning methods for byte array put/get methods, to guarantee
thread safe use.
    + *  
    + * In addition, the code that uses this class should make sure that only one implementation
class is ever loaded -
    + * Either the {@link HeapMemorySegment}, or this DirectMemorySegment. That way, all the
abstract methods in the
    + * MemorySegment base class have only one loaded actual implementation. This is easy
for the JIT to recognize through
    + * class hierarchy analysis, or by identifying that the invocations are monomorphic (all
go to the same concrete
    + * method implementation). Under this precondition, the JIT can perfectly inline methods.
    + * 
    + * This is harder to do and control with byte buffers, where different code paths use
different versions of the class
    + * (heap, direct, mapped) and thus virtual method invocations are polymorphic and are
not as easily inlined.
    + */
    +public class DirectMemorySegment extends MemorySegment {
    +	
    +	/** The direct byte buffer that allocated the memory */
    +	protected final ByteBuffer buffer;
    +	
    +	/** The address to the off-heap data */
    +	private long address;
    +	
    +	/** The address one byte after the last addressable byte.
    +	 *  This is address + size while the segment is not disposed */
    +	private final long addressLimit;
    +
    +	/** The size in bytes of the memory segment */
    +	private final int size;
    +	
    +	// -------------------------------------------------------------------------
    +	//                             Constructors
    +	// -------------------------------------------------------------------------
    +
    +	public DirectMemorySegment(int size) {
    +		this(ByteBuffer.allocateDirect(size));
    +	}
    +
    +	public DirectMemorySegment(ByteBuffer buffer) {
    +		if (buffer == null || !buffer.isDirect()) {
    +			throw new IllegalArgumentException();
    +		}
    +		
    +		this.buffer = buffer;
    +		this.size = buffer.capacity();
    +		this.address = getAddress(buffer);
    +		this.addressLimit = this.address + size;
    +		
    +		if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) {
    +			throw new RuntimeException("Segment initialized with too large address: " + address);
    +		}
    +	}
    +
    +	// -------------------------------------------------------------------------
    +	//                        MemorySegment Accessors
    +	// -------------------------------------------------------------------------
    +	
    +
    +	@Override
    +	public final boolean isFreed() {
    +		return this.address > this.addressLimit;
    +	}
    +
    +	public final void free() {
    +		// this ensures we can place no more data and trigger
    +		// the checks for the freed segment
    +		this.address = this.addressLimit + 1;
    +	}
    +	
    +	@Override
    +	public final int size() {
    +		return this.size;
    +	}
    +
    +	@Override
    +	public ByteBuffer wrap(int offset, int length) {
    +		if (offset < 0 || offset > this.size || offset > this.size - length) {
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		this.buffer.limit(offset + length);
    +		this.buffer.position(offset);
    +		
    +		return this.buffer;
    +	}
    +
    +
    +	// ------------------------------------------------------------------------
    +	//                    Random Access get() and put() methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final byte get(int index) {
    +		
    +		final long pos = address + index;
    +		if (index >= 0 && pos < addressLimit) {
    +			return UNSAFE.getByte(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void put(int index, byte b) {
    +		
    +		final long pos = address + index;
    +		if (index >= 0 && pos < addressLimit) {
    +			UNSAFE.putByte(pos, b);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	public final void get(int index, byte[] dst) {
    +		get(index, dst, 0, dst.length);
    +	}
    +
    +	@Override
    +	public final void put(int index, byte[] src) {
    +		put(index, src, 0, src.length);
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void get(int index, byte[] dst, int offset, int length) {
    +		
    +		// check the byte array offset and length
    +		if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		long pos = address + index;
    +		
    +		if (index >= 0 && pos <= addressLimit - length) {
    +			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
    +			
    +			// the copy must proceed in batches not too large, because the JVM may
    +			// poll for points that are safe for GC (moving the array and changing its address)
    +			while (length > 0) {
    +				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
    +				UNSAFE.copyMemory(null, pos, dst, arrayAddress, toCopy);
    +				length -= toCopy;
    +				pos += toCopy;
    +				arrayAddress += toCopy;
    +			}
    +		}
    +		else if (address <= 0) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void put(int index, byte[] src, int offset, int length) {
    +		// check the byte array offset and length
    +		if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0)
{
    +			throw new IndexOutOfBoundsException();
    +		}
    +		
    +		long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - length) {
    +		
    +			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
    +			while (length > 0) {
    +				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
    +				UNSAFE.copyMemory(src, arrayAddress, null, pos, toCopy);
    +				length -= toCopy;
    +				pos += toCopy;
    +				arrayAddress += toCopy;
    +			}
    +		}
    +		else if (address <= 0) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	public final boolean getBoolean(int index) {
    +		return get(index) != 0;
    +	}
    +
    +	@Override
    +	public final void putBoolean(int index, boolean value) {
    +		put(index, (byte) (value ? 1 : 0));
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final char getChar(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			return UNSAFE.getChar(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putChar(int index, char value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			UNSAFE.putChar(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final short getShort(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			return UNSAFE.getShort(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putShort(int index, short value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 2) {
    +			UNSAFE.putShort(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final int getInt(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 4) {
    +			return UNSAFE.getInt(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putInt(int index, int value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 4) {
    +			UNSAFE.putInt(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final long getLong(int index) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 8) {
    +			return UNSAFE.getLong(pos);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("restriction")
    +	public final void putLong(int index, long value) {
    +		final long pos = address + index;
    +		if (index >= 0 && pos <= addressLimit - 8) {
    +			UNSAFE.putLong(pos, value);
    +		}
    +		else if (address > addressLimit) {
    +			throw new IllegalStateException("disposed");
    +		}
    +		else {
    +			// index is in fact invalid
    +			throw new IndexOutOfBoundsException();
    +		}
    +	}
    +	
    +	// -------------------------------------------------------------------------
    +	//                     Bulk Read and Write Methods
    +	// -------------------------------------------------------------------------
    +	
    +	@Override
    +	public final void get(DataOutput out, int offset, int length) throws IOException {
    +		throw new UnsupportedOperationException("not implemented");
    --- End diff --
    
    Did you forget these? For example the `AdaptiveSpanningRecordDeserializer` uses this method
and therefore wouldn't work with direct memory segments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message