flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
Date Fri, 01 Mar 2019 02:00:55 GMT
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce
an abstract set of data formats
URL: https://github.com/apache/flink/pull/7816#discussion_r261457344
 
 

 ##########
 File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##########
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.util.SegmentsUtil;
+
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly
reduce the
+ * serialization/deserialization of Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains null bit set and field values. Null bit set is used
for null tracking and is
+ * aligned to 8-byte word boundaries. `Field values` holds fixed-length primitive types and
variable-length
+ * values which can be stored in 8 bytes inside. If it do not fit the variable-length field,
then store the
+ * length and offset of variable-length part. Fixed-length part will certainly fall into
a MemorySegment,
+ * which will speed up the read and write of field.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ *
+ * <p>{@code BinaryRow} are influenced by Apache Spark UnsafeRow in project tungsten.
+ * The difference is that BinaryRow is placed on a discontinuous memory, and the variable
length type can
+ * also be placed on a fixed length area (If it's short enough).
+ */
+public final class BinaryRow extends BinaryFormat<Object> implements BaseRow {
+
+	public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+
+	public static int calculateBitSetWidthInBytes(int arity) {
+		// add 8 bit header
+		return ((arity + 63 + 8) / 64) * 8;
+	}
+
+	private final int arity;
+	private final int nullBitsSizeInBytes;
+
+	public BinaryRow(int arity) {
+		checkArgument(arity >= 0);
+		this.arity = arity;
+		this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+	}
+
+	private int getFieldOffset(int pos) {
+		return offset + nullBitsSizeInBytes + pos * 8;
+	}
+
+	private void assertIndexIsValid(int index) {
+		assert index >= 0 : "index (" + index + ") should >= 0";
+		assert index < arity : "index (" + index + ") should < " + arity;
+	}
+
+	public int getFixedLengthPartSize() {
+		return nullBitsSizeInBytes + 8 * arity;
+	}
+
+	@Override
+	public int getArity() {
+		return arity;
+	}
+
+	@Override
+	public byte getHeader() {
+		// first nullBitsSizeInBytes byte is header.
+		return segments[0].get(offset);
+	}
+
+	@Override
+	public void setHeader(byte header) {
+		segments[0].put(offset, header);
+	}
+
+	public void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
+		this.segments = new MemorySegment[] {segment};
+		this.offset = offset;
+		this.sizeInBytes = sizeInBytes;
+	}
+
+	public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
+		this.segments = segments;
+		this.offset = offset;
+		this.sizeInBytes = sizeInBytes;
+	}
+
+	public void setTotalSize(int sizeInBytes) {
+		this.sizeInBytes = sizeInBytes;
+	}
+
+	private void setNotNullAt(int i) {
+		assertIndexIsValid(i);
+		// need add header 8 bit.
+		SegmentsUtil.bitUnSet(segments[0], offset, i + 8);
+	}
+
+	@Override
+	public void setNullAt(int i) {
+		assertIndexIsValid(i);
+		// need add header 8 bit.
+		SegmentsUtil.bitSet(segments[0], offset, i + 8);
+		// We must set the fixed length part zero.
+		// 1.Only int/long/boolean...(Fix length type) will invoke this setNullAt.
+		// 2.Set to zero in order to equals and hash operation bytes calculation.
+		segments[0].putLong(getFieldOffset(i), 0);
+	}
+
+	@Override
+	public void setInt(int pos, int value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putInt(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setLong(int pos, long value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putLong(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setDouble(int pos, double value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putDouble(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setChar(int pos, char value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putChar(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setBoolean(int pos, boolean value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putBoolean(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setShort(int pos, short value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putShort(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setByte(int pos, byte value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].put(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public void setFloat(int pos, float value) {
+		assertIndexIsValid(pos);
+		setNotNullAt(pos);
+		segments[0].putFloat(getFieldOffset(pos), value);
+	}
+
+	@Override
+	public boolean isNullAt(int pos) {
+		assertIndexIsValid(pos);
+		// need add header 8 bit.
+		return SegmentsUtil.bitGet(segments[0], offset, pos + 8);
+	}
+
+	@Override
+	public boolean getBoolean(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getBoolean(getFieldOffset(pos));
+	}
+
+	@Override
+	public byte getByte(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].get(getFieldOffset(pos));
+	}
+
+	@Override
+	public short getShort(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getShort(getFieldOffset(pos));
+	}
+
+	@Override
+	public int getInt(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getInt(getFieldOffset(pos));
+	}
+
+	@Override
+	public long getLong(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getLong(getFieldOffset(pos));
+	}
+
+	@Override
+	public float getFloat(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getFloat(getFieldOffset(pos));
+	}
+
+	@Override
+	public double getDouble(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getDouble(getFieldOffset(pos));
+	}
+
+	@Override
+	public char getChar(int pos) {
+		assertIndexIsValid(pos);
+		return segments[0].getChar(getFieldOffset(pos));
+	}
+
+	@Override
+	public BinaryString getString(int pos) {
+		int fieldOffset = getFieldOffset(pos);
+		final long offsetAndSize = segments[0].getLong(fieldOffset);
+		return BinaryFormat.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndSize);
+	}
+
+	/**
+	 * The bit is 1 when the field is null. Default is 0.
+	 */
+	public boolean anyNull() {
+		for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
 
 Review comment:
   Add a test case for this

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message