Repository: cassandra
Updated Branches:
refs/heads/trunk f8358b8ea -> e8d8941cc
Add row size to sstable format for faster skipping
patch by slebresne; reviewed by benedict for CASSANDRA-10378
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6584331c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6584331c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6584331c
Branch: refs/heads/trunk
Commit: 6584331c881329c2cb9afbcef19997e8a2a612d9
Parents: 028c3f7
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Sep 22 13:53:22 2015 -0700
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Oct 9 18:07:01 2015 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/db/ColumnIndex.java | 10 +-
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../cassandra/db/SerializationHeader.java | 26 ++-
.../cassandra/db/UnfilteredDeserializer.java | 8 +-
.../rows/UnfilteredRowIteratorSerializer.java | 10 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 174 +++++++++++--------
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 4 +-
.../apache/cassandra/db/RowIndexEntryTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 3 +-
.../db/compaction/AntiCompactionTest.java | 2 +-
.../io/sstable/BigTableWriterTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
14 files changed, 155 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index add5fa7..ede3f79 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -76,6 +76,7 @@ public class ColumnIndex
private long startPosition = -1;
private int written;
+ private long previousRowStart;
private ClusteringPrefix firstClustering;
private ClusteringPrefix lastClustering;
@@ -99,7 +100,7 @@ public class ColumnIndex
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
- UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer,
version);
+ UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(),
header, writer, version);
}
public ColumnIndex build() throws IOException
@@ -131,15 +132,18 @@ public class ColumnIndex
private void add(Unfiltered unfiltered) throws IOException
{
+ long pos = currentPosition();
+
if (firstClustering == null)
{
// Beginning of an index block. Remember the start and position
firstClustering = unfiltered.clustering();
- startPosition = currentPosition();
+ startPosition = pos;
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart,
version);
lastClustering = unfiltered.clustering();
+ previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 014467e..f47efe3 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -430,7 +430,7 @@ public class Memtable implements Comparable<Memtable>
(long)partitions.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
sstableMetadataCollector,
- new SerializationHeader(cfs.metadata,
columns, stats),
+ new SerializationHeader(true,
cfs.metadata, columns, stats),
txn));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index decac49..0706d06 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -45,6 +45,8 @@ public class SerializationHeader
{
public static final Serializer serializer = new Serializer();
+ private final boolean isForSSTable;
+
private final AbstractType<?> keyType;
private final List<AbstractType<?>> clusteringTypes;
@@ -53,12 +55,14 @@ public class SerializationHeader
private final Map<ByteBuffer, AbstractType<?>> typeMap;
- private SerializationHeader(AbstractType<?> keyType,
+ private SerializationHeader(boolean isForSSTable,
+ AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
PartitionColumns columns,
EncodingStats stats,
Map<ByteBuffer, AbstractType<?>> typeMap)
{
+ this.isForSSTable = isForSSTable;
this.keyType = keyType;
this.clusteringTypes = clusteringTypes;
this.columns = columns;
@@ -77,7 +81,8 @@ public class SerializationHeader
List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
for (int i = 0; i < size; i++)
clusteringTypes.add(BytesType.instance);
- return new SerializationHeader(BytesType.instance,
+ return new SerializationHeader(false,
+ BytesType.instance,
clusteringTypes,
PartitionColumns.NONE,
EncodingStats.NO_STATS,
@@ -108,14 +113,16 @@ public class SerializationHeader
else
columns.addAll(sstable.header.columns());
}
- return new SerializationHeader(metadata, columns.build(), stats.get());
+ return new SerializationHeader(true, metadata, columns.build(), stats.get());
}
- public SerializationHeader(CFMetaData metadata,
+ public SerializationHeader(boolean isForSSTable,
+ CFMetaData metadata,
PartitionColumns columns,
EncodingStats stats)
{
- this(metadata.getKeyValidator(),
+ this(isForSSTable,
+ metadata.getKeyValidator(),
typesOf(metadata.clusteringColumns()),
columns,
stats,
@@ -137,6 +144,11 @@ public class SerializationHeader
return !columns.statics.isEmpty();
}
+ public boolean isForSSTable()
+ {
+ return isForSSTable;
+ }
+
public EncodingStats stats()
{
return stats;
@@ -320,7 +332,7 @@ public class SerializationHeader
}
builder.add(column);
}
- return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats,
typeMap);
+ return new SerializationHeader(true, keyType, clusteringTypes, builder.build(),
stats, typeMap);
}
@Override
@@ -390,7 +402,7 @@ public class SerializationHeader
regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars,
in);
}
- return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics,
regulars), stats, null);
+ return new SerializationHeader(false, keyType, clusteringTypes, new PartitionColumns(statics,
regulars), stats, null);
}
public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection,
boolean hasStatic)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 5c76c63..ef30289 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -147,7 +147,7 @@ public abstract class UnfilteredDeserializer
return;
}
- nextExtendedFlags = UnfilteredSerializer.isExtended(nextFlags) ? in.readUnsignedByte()
: 0;
+ nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags);
clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
isReady = true;
@@ -195,14 +195,14 @@ public abstract class UnfilteredDeserializer
public void skipNext() throws IOException
{
isReady = false;
- ClusteringPrefix.Kind kind = clusteringDeserializer.skipNext();
+ clusteringDeserializer.skipNext();
if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- UnfilteredSerializer.serializer.skipMarkerBody(in, header, kind.isBoundary());
+ UnfilteredSerializer.serializer.skipMarkerBody(in);
}
else
{
- UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags, nextExtendedFlags);
+ UnfilteredSerializer.serializer.skipRowBody(in);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index df006d7..3a0558e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -80,7 +80,8 @@ public class UnfilteredRowIteratorSerializer
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus
out, int version, int rowEstimate) throws IOException
{
- SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ SerializationHeader header = new SerializationHeader(false,
+ iterator.metadata(),
iterator.columns(),
iterator.stats());
serialize(iterator, header, selection, out, version, rowEstimate);
@@ -89,6 +90,8 @@ public class UnfilteredRowIteratorSerializer
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter
selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
+ assert !header.isForSSTable();
+
ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
int flags = 0;
@@ -134,7 +137,8 @@ public class UnfilteredRowIteratorSerializer
// recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int
version, int rowEstimate)
{
- SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ SerializationHeader header = new SerializationHeader(false,
+ iterator.metadata(),
iterator.columns(),
iterator.stats());
@@ -175,7 +179,7 @@ public class UnfilteredRowIteratorSerializer
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
{
- SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE,
EncodingStats.NO_STATS);
+ SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE,
EncodingStats.NO_STATS);
return new Header(sh, key, isReversed, true, null, null, 0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index b83ccf9..4efc5eb 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -35,10 +35,12 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* flag is defined/explained below as the "Unfiltered flags" constants. One of those
flags
* is an extension flag, and if present, trigger the rid of another byte that contains
more
* flags. If the extension is not set, defaults are assumed for the flags of that 2nd
byte.
- * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj>
where
- * <clustering> is the row clustering as serialized by
- * {@code Clustering.serializer}. Note that static row are an exception and
- * don't have this. <timestamp>, <ttl> and <deletion> are the row
timestamp, ttl and deletion
+ * <row> is <clustering><size>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj>
where
+ * <clustering> is the row clustering as serialized by {@code Clustering.serializer}
(note
+ * that static row are an exception and don't have this).
+ * <size> is the size of the whole unfiltered on disk (it's only used for sstables
and is
+ * used to efficiently skip rows).
+ * <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and
deletion
* whose presence is determined by the flags. <sci> is the simple columns of
the row and <ccj> the
* complex ones.
* The columns for the row are then serialized if they differ from those in the header,
@@ -90,22 +92,35 @@ public class UnfilteredSerializer
public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus
out, int version)
throws IOException
{
+ assert !header.isForSSTable();
+ serialize(unfiltered, header, out, 0, version);
+ }
+
+ public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus
out, long previousUnfilteredSize, int version)
+ throws IOException
+ {
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+ serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize,
version);
}
else
{
- serialize((Row) unfiltered, header, out, version);
+ serialize((Row) unfiltered, header, out, previousUnfilteredSize, version);
}
}
- public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+ public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out,
int version)
+ throws IOException
+ {
+ assert row.isStatic();
+ serialize(row, header, out, 0, version);
+ }
+
+ private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long
previousUnfilteredSize, int version)
throws IOException
{
int flags = 0;
int extendedFlags = 0;
- boolean hasExtendedFlags = false;
boolean isStatic = row.isStatic();
Columns headerColumns = header.columns(isStatic);
@@ -113,12 +128,10 @@ public class UnfilteredSerializer
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.size() == headerColumns.size());
+ boolean hasExtendedFlags = hasExtendedFlags(row);
if (isStatic)
- {
- hasExtendedFlags = true;
extendedFlags |= IS_STATIC;
- }
if (!pkLiveness.isEmpty())
flags |= HAS_TIMESTAMP;
@@ -128,10 +141,7 @@ public class UnfilteredSerializer
{
flags |= HAS_DELETION;
if (deletion.isShadowable())
- {
- hasExtendedFlags = true;
extendedFlags |= HAS_SHADOWABLE_DELETION;
- }
}
if (hasComplexDeletion)
flags |= HAS_COMPLEX_DELETION;
@@ -148,6 +158,12 @@ public class UnfilteredSerializer
if (!isStatic)
Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
+ if (header.isForSSTable())
+ {
+ out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize,
version));
+ out.writeUnsignedVInt(previousUnfilteredSize);
+ }
+
if ((flags & HAS_TIMESTAMP) != 0)
header.writeTimestamp(pkLiveness.timestamp(), out);
if ((flags & HAS_TTL) != 0)
@@ -181,12 +197,18 @@ public class UnfilteredSerializer
Cell.serializer.serialize(cell, out, rowLiveness, header);
}
- public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus
out, int version)
+ private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus
out, long previousUnfilteredSize, int version)
throws IOException
{
out.writeByte((byte)IS_MARKER);
RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
+ if (header.isForSSTable())
+ {
+ out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize,
version));
+ out.writeUnsignedVInt(previousUnfilteredSize);
+ }
+
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -201,15 +223,37 @@ public class UnfilteredSerializer
public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version)
{
+ assert !header.isForSSTable();
+ return serializedSize(unfiltered, header, 0, version);
+ }
+
+ public long serializedSize(Unfiltered unfiltered, SerializationHeader header, long previousUnfilteredSize,int
version)
+ {
return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
- ? serializedSize((RangeTombstoneMarker) unfiltered, header, version)
- : serializedSize((Row) unfiltered, header, version);
+ ? serializedSize((RangeTombstoneMarker) unfiltered, header, previousUnfilteredSize,
version)
+ : serializedSize((Row) unfiltered, header, previousUnfilteredSize, version);
}
- public long serializedSize(Row row, SerializationHeader header, int version)
+ private long serializedSize(Row row, SerializationHeader header, long previousUnfilteredSize,
int version)
{
long size = 1; // flags
+ if (hasExtendedFlags(row))
+ size += 1; // extended flags
+
+ if (!row.isStatic())
+ size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
+
+ return size + serializedRowBodySize(row, header, previousUnfilteredSize, version);
+ }
+
+ private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize,
int version)
+ {
+ long size = 0;
+
+ if (header.isForSSTable())
+ size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
boolean isStatic = row.isStatic();
Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -217,12 +261,6 @@ public class UnfilteredSerializer
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.size() == headerColumns.size());
- if (isStatic || deletion.isShadowable())
- size += 1; // extended flags
-
- if (!isStatic)
- size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
-
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
if (pkLiveness.isExpiring())
@@ -261,10 +299,19 @@ public class UnfilteredSerializer
return size;
}
- public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int
version)
+ private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header,
long previousUnfilteredSize, int version)
{
- long size = 1 // flags
- + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version,
header.clusteringTypes());
+ assert !header.isForSSTable();
+ return 1 // flags
+ + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version,
header.clusteringTypes())
+ + serializedMarkerBodySize(marker, header, previousUnfilteredSize, version);
+ }
+
+ private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader
header, long previousUnfilteredSize, int version)
+ {
+ long size = 0;
+ if (header.isForSSTable())
+ size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
if (marker.isBoundary())
{
@@ -299,7 +346,7 @@ public class UnfilteredSerializer
if (isEndOfPartition(flags))
return null;
- int extendedFlags = isExtended(flags) ? in.readUnsignedByte() : 0;
+ int extendedFlags = readExtendedFlags(in, flags);
if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
@@ -328,6 +375,12 @@ public class UnfilteredSerializer
public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader
header, RangeTombstone.Bound bound)
throws IOException
{
+ if (header.isForSSTable())
+ {
+ in.readUnsignedVInt(); // marker size
+ in.readUnsignedVInt(); // previous unfiltered size
+ }
+
if (bound.isBoundary())
return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
else
@@ -353,6 +406,12 @@ public class UnfilteredSerializer
boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
Columns headerColumns = header.columns(isStatic);
+ if (header.isForSSTable())
+ {
+ in.readUnsignedVInt(); // Skip row size
+ in.readUnsignedVInt(); // previous unfiltered size
+ }
+
LivenessInfo rowLiveness = LivenessInfo.EMPTY;
if (hasTimestamp)
{
@@ -430,36 +489,10 @@ public class UnfilteredSerializer
}
}
- public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int
extendedFlags) throws IOException
+ public void skipRowBody(DataInputPlus in) throws IOException
{
- boolean isStatic = isStatic(extendedFlags);
- boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
- boolean hasTTL = (flags & HAS_TTL) != 0;
- boolean hasDeletion = (flags & HAS_DELETION) != 0;
- boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
- boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
- Columns headerColumns = header.columns(isStatic);
-
- // Note that we don't want want to use FileUtils.skipBytesFully for anything that
may not have
- // the size we think due to VINT encoding
- if (hasTimestamp)
- header.skipTimestamp(in);
- if (hasTTL)
- {
- header.skipLocalDeletionTime(in);
- header.skipTTL(in);
- }
- if (hasDeletion)
- header.skipDeletionTime(in);
-
- Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns,
in);
- for (ColumnDefinition column : columns)
- {
- if (column.isSimple())
- Cell.serializer.skip(in, column, header);
- else
- skipComplexColumn(in, column, header, hasComplexDeletion);
- }
+ int rowSize = (int)in.readUnsignedVInt();
+ in.skipBytesFully(rowSize);
}
public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper
helper) throws IOException
@@ -468,20 +501,13 @@ public class UnfilteredSerializer
assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW &&
isExtended(flags) : "Flags is " + flags;
int extendedFlags = in.readUnsignedByte();
assert isStatic(extendedFlags);
- skipRowBody(in, header, flags, extendedFlags);
+ skipRowBody(in);
}
- public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary)
throws IOException
+ public void skipMarkerBody(DataInputPlus in) throws IOException
{
- if (isBoundary)
- {
- header.skipDeletionTime(in);
- header.skipDeletionTime(in);
- }
- else
- {
- header.skipDeletionTime(in);
- }
+ int markerSize = (int)in.readUnsignedVInt();
+ in.skipBytesFully(markerSize);
}
private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader
header, boolean hasComplexDeletion)
@@ -510,8 +536,18 @@ public class UnfilteredSerializer
return (extendedFlags & IS_STATIC) != 0;
}
- public static boolean isExtended(int flags)
+ private static boolean isExtended(int flags)
{
return (flags & EXTENSION_FLAG) != 0;
}
+
+ public static int readExtendedFlags(DataInputPlus in, int flags) throws IOException
+ {
+ return isExtended(flags) ? in.readUnsignedByte() : 0;
+ }
+
+ public static boolean hasExtendedFlags(Row row)
+ {
+ return row.isStatic() || row.deletion().isShadowable();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index d94b219..62348ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -65,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
0,
- new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+ new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
}
private static Descriptor createDescriptor(File directory, final String keyspace, final
String columnFamily, final SSTableFormat.Type fmt)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index f4b9adf..6d3a714 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -63,7 +63,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
- this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
+ this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
}
@@ -89,7 +89,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
// improve that. In particular, what we count is closer to the serialized value,
but it's debatable that it's the right thing
// to count since it will take a lot more space in memory and the bufferSize if first
and foremost used to avoid OOM when
// using this writer.
- currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, formatType.info.getLatestVersion().correspondingMessagingVersion());
+ currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
}
private void maybeSync() throws SyncException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 25baa4e..62c88a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -60,7 +60,7 @@ public class RowIndexEntryTest extends CQLTester
DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
- SerializationHeader header = new SerializationHeader(cfMeta, cfMeta.partitionColumns(),
EncodingStats.NO_STATS);
+ SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(),
EncodingStats.NO_STATS);
IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta,
BigFormat.latestVersion, header);
DataOutputBuffer dob = new DataOutputBuffer();
@@ -119,7 +119,7 @@ public class RowIndexEntryTest extends CQLTester
final RowIndexEntry simple = new RowIndexEntry(123);
DataOutputBuffer buffer = new DataOutputBuffer();
- SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS);
+ SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS);
RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata,
BigFormat.latestVersion, header);
serializer.serialize(simple, buffer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 2fc8436..ab99750 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -325,7 +325,8 @@ public class ScrubTest
keys.size(),
0L,
0,
- new SerializationHeader(cfs.metadata,
+ new SerializationHeader(true,
+
cfs.metadata,
cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index cd82b19..db07eb8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -166,7 +166,7 @@ public class AntiCompactionTest
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm,
cfm.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true,
cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index e1ab48f..78964f4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
private TestableBTW(String file)
{
- this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+ this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(true,
cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
}
private TestableBTW(String file, SSTableTxnWriter sw)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 093bffd..bd286e4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -943,7 +943,7 @@ public class SSTableRewriterTest extends SchemaLoader
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new
SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new
SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
{
int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount)
/ fileCount;
for ( ; i < end ; i++)
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction
txn)
{
String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
}
public static ByteBuffer random(int i, int size)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index fcd2d71..5c7ff02 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -187,7 +187,7 @@ public class SSTableUtils
{
public SerializationHeader header()
{
- return new SerializationHeader(Schema.instance.getCFMetaData(ksname,
cfname), builder.build(), EncodingStats.NO_STATS);
+ return new SerializationHeader(true, Schema.instance.getCFMetaData(ksname,
cfname), builder.build(), EncodingStats.NO_STATS);
}
@Override
|