Post review modifications
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3c5245a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3c5245a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3c5245a
Branch: refs/heads/10360
Commit: e3c5245a0592958edee3ffcb673e0e02023fb38f
Parents: 228b802
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Oct 19 18:17:40 2015 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Oct 20 12:30:05 2015 +0200
----------------------------------------------------------------------
.../cassandra/db/UnfilteredDeserializer.java | 312 ++++++++++++-------
1 file changed, 197 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3c5245a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 6be587b..a158047 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -18,8 +18,12 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.io.IOError;
import java.util.*;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.PeekingIterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
@@ -219,18 +223,18 @@ public abstract class UnfilteredDeserializer
private final boolean readAllAsDynamic;
private boolean skipStatic;
- private boolean inputExhausted;
-
// The next Unfiltered to return, computed by hasNext()
private Unfiltered next;
+ // A temporary storage for an unfiltered that isn't returned next but should be looked
at just afterwards
+ private Unfiltered saved;
- // In some condition, we don't use the last atom read from the input for the current
iteration
- // (typically, because we need to close an open tombstone range first). In that case
we use
- // nextAtom to save the read but still unused atom. It is then pick up by the next
iteration before
- // reaching into the input.
- private LegacyLayout.LegacyAtom nextAtom;
+ private boolean isFirst = true;
- private final LegacyLayout.CellGrouper grouper;
+ // The Unfiltered as read from the old format input
+ private final UnfilteredIterator iterator;
+
+ // Tracks which tombstone are opened at any given point of the deserialization. Note
that this
+ // is directly populated by UnfilteredIterator.
private final TombstoneTracker tombstoneTracker;
private OldFormatDeserializer(CFMetaData metadata,
@@ -240,9 +244,9 @@ public abstract class UnfilteredDeserializer
boolean readAllAsDynamic)
{
super(metadata, in, helper);
- this.readAllAsDynamic = readAllAsDynamic;
- this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
+ this.iterator = new UnfilteredIterator();
+ this.readAllAsDynamic = readAllAsDynamic;
}
public void setSkipStatic()
@@ -250,115 +254,61 @@ public abstract class UnfilteredDeserializer
this.skipStatic = true;
}
- public boolean hasNext() throws IOException
+ private boolean isStatic(Unfiltered unfiltered)
{
- LegacyLayout.LegacyAtom atom = null;
+ return unfiltered.isRow() && ((Row)unfiltered).isStatic();
+ }
- while (next == null)
+ public boolean hasNext() throws IOException
+ {
+ try
{
- // Deserialize the next Unfiltered to return, updating the 'next' field.
- if (atom == null)
- {
- if (nextAtom == null)
- {
- atom = readNextAtom();
- if (atom == null)
- return false;
- }
- else
- {
- atom = nextAtom;
- nextAtom = null;
- }
- }
-
- // If a range tombstone closes strictly before this atom, we need to return
that close (or boundary) marker first.
- if (tombstoneTracker.hasClosingMarkerBefore(atom))
- {
- nextAtom = atom;
- next = tombstoneTracker.popClosingMarker();
- return true;
- }
-
- if (isRow(atom))
+ while (next == null)
{
- // Read the full row
- LegacyLayout.CellGrouper grouper = atom.isStatic()
- ? LegacyLayout.CellGrouper.staticGrouper(metadata,
helper)
- : this.grouper;
-
- grouper.reset();
- grouper.addAtom(atom);
- while ((atom = readNextAtom()) != null)
+ if (saved == null && !iterator.hasNext())
{
- // Add the atom to the grouper. If it's not an atom belonging to
the built row, we're done for this
- // row and should the current atom for the next iteration
- if (!grouper.addAtom(atom))
+ if (tombstoneTracker.hasOpenTombstones())
{
- assert nextAtom == null;
- nextAtom = atom;
- break;
+ next = tombstoneTracker.popClosingMarker();
+ return true;
}
+
+ return false;
}
- atom = null;
- Row row = grouper.getRow();
- // When reading old tables, we sometimes want to skip static data (due
to how staticly defined column of compact tables are
- // handled). So if we're asked to, ignore if it's static.
- if (!(skipStatic && row.isStatic()))
- next = row;
- }
- else
- {
- LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
- atom = null; // It's possible this tombstone doesn't generate something
to return right away. That's the case if the
- // beginning of the new range is shadowed by the currently
open range for instance (but the end is not
- // shadowed so that the atom wasn't skipped by readNextAtom).
In that case, we'll want to clear atom for
- // the next iteration, so clear it now (we'll use the tombstone
variable in what follow anyway).
+ next = saved == null ? iterator.next() : saved;
+ saved = null;
// The sstable iterators assume that if there is one, the static row
is the first thing this deserializer will return.
// However, in the old format, a range tombstone with an empty start
would sort before any static cell. So we should
- // detect that case and invert the return order if necessary.
- if (tombstone.start.bound.size() == 0)
+ // detect that case and return the static parts first if necessary.
+ if (isFirst && iterator.hasNext() && isStatic(iterator.peek()))
{
- LegacyLayout.LegacyAtom followingAtom = readNextAtom();
- if (followingAtom != null && followingAtom.isStatic())
- {
- // We have a both a RT starting from the beginning of the partition,
and a static row. So set
- // atom to the static row so it's returned next, and save the
RT for after that.
- atom = followingAtom;
- nextAtom = tombstone;
- continue;
- }
- else
- {
- // The following atom isn't static, so return the RT normally
but save that following atom for after
- nextAtom = followingAtom;
- }
+ saved = next;
+ next = iterator.next();
+ }
+ // If a range tombstone closes strictly before the next row/RT, we need
to return that close (or boundary) marker first.
+ // Note that because the tombstone tracker is empty initially, we know
that branch and the previous one exclude each other.
+ else if (tombstoneTracker.hasClosingMarkerBefore(next))
+ {
+ saved = next;
+ next = tombstoneTracker.popClosingMarker();
}
+ isFirst = false;
- // We have an opening range. What we do depends on what already opened
range we have.
- next = tombstoneTracker.openNew(tombstone);
+ // When reading old tables, we sometimes want to skip static data (due
to how staticly defined column of compact
+ // tables are handled).
+ if (skipStatic && isStatic(next))
+ next = null;
}
+ return true;
}
- // We get here is next != null so we have something to return
- return true;
- }
-
- // Returns null if we're done
- private LegacyLayout.LegacyAtom readNextAtom() throws IOException
- {
- while (!inputExhausted)
+ catch (IOError e)
{
- LegacyLayout.LegacyAtom atom = LegacyLayout.readLegacyAtom(metadata, in,
readAllAsDynamic);
- // We don't want to return shadowed data because that would fail the contract
- // of UnfilteredRowIterator. However the old format could have shadowed data,
so filter it here.
- if (atom == null)
- inputExhausted = true;
- else if (!tombstoneTracker.isShadowed(atom))
- return atom;
+ if (e.getCause() != null && e.getCause() instanceof IOException)
+ throw (IOException)e.getCause();
+ throw e;
}
- return null;
}
private boolean isRow(LegacyLayout.LegacyAtom atom)
@@ -404,8 +354,143 @@ public abstract class UnfilteredDeserializer
public void clearState()
{
next = null;
- nextAtom = null;
- inputExhausted = false;
+ saved = null;
+ iterator.clearState();
+ tombstoneTracker.clearState();
+ }
+
+ // Groups atoms from the input into proper Unfiltered. This also populate the tombstoneTracker.
+ // Note: this could use guava AbstractIterator except that we want to be able to
clear
+ // the internal state of the iterator so it's cleaner to do it ourselves.
+ private class UnfilteredIterator implements PeekingIterator<Unfiltered>
+ {
+ private final AtomIterator atoms;
+ private final LegacyLayout.CellGrouper grouper;
+
+ private Unfiltered next;
+
+ private UnfilteredIterator()
+ {
+ this.atoms = new AtomIterator();
+ this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
+ }
+
+ public boolean hasNext()
+ {
+ while (next == null && atoms.hasNext())
+ {
+ LegacyLayout.LegacyAtom atom = atoms.next();
+ next = isRow(atom) ? readRow(atom) : tombstoneTracker.openNew(atom.asRangeTombstone());
+ }
+ return next != null;
+ }
+
+ private Unfiltered readRow(LegacyLayout.LegacyAtom first)
+ {
+ LegacyLayout.CellGrouper grouper = first.isStatic()
+ ? LegacyLayout.CellGrouper.staticGrouper(metadata,
helper)
+ : this.grouper;
+ grouper.reset();
+ grouper.addAtom(first);
+ // As long as atoms are part of the same row, consume them. Note that the
call to addAtom() uses
+ // atoms.peek() so that the atom is only consumed (by next) if it's part
of the row (addAtom returns true)
+ while (atoms.hasNext() && grouper.addAtom(atoms.peek()))
+ {
+ atoms.next();
+ }
+ return grouper.getRow();
+ }
+
+ public Unfiltered next()
+ {
+ assert hasNext();
+ Unfiltered toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ public Unfiltered peek()
+ {
+ assert hasNext();
+ return next;
+ }
+
+ public void clearState()
+ {
+ atoms.clearState();
+ next = null;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // Wraps the input of the deserializer to provide an iterator (and skip shadowed
atoms).
+ // Note: this could use guava AbstractIterator except that we want to be able to
clear
+ // the internal state of the iterator so it's cleaner to do it ourselves.
+ private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
+ {
+ private boolean isDone;
+ private LegacyLayout.LegacyAtom next;
+
+ public boolean hasNext()
+ {
+ if (isDone)
+ return false;
+
+ while (next == null)
+ {
+ next = readAtom();
+ if (next == null)
+ {
+ isDone = true;
+ return false;
+ }
+
+ if (tombstoneTracker.isShadowed(next))
+ next = null;
+ }
+ return true;
+ }
+
+ private LegacyLayout.LegacyAtom readAtom()
+ {
+ try
+ {
+ return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public LegacyLayout.LegacyAtom next()
+ {
+ assert hasNext();
+ LegacyLayout.LegacyAtom toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ public LegacyLayout.LegacyAtom peek()
+ {
+ assert hasNext();
+ return next;
+ }
+
+ public void clearState()
+ {
+ this.next = null;
+ this.isDone = false;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
}
/**
@@ -436,25 +521,17 @@ public abstract class UnfilteredDeserializer
if (partitionDeletion.deletes(timestamp))
return true;
- for (LegacyLayout.LegacyRangeTombstone tombstone : openTombstones)
- {
- if (tombstone.deletionTime.deletes(timestamp))
- {
- // If it's a cell, then it's shadowed. If it's a RT, it's only shadowed
if it ends before the tombstone we test against
- return atom.isCell() || metadata.comparator.compare(atom.asRangeTombstone().stop.bound,
tombstone.stop.bound) <= 0;
- }
- }
-
- return false;
+ SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom)
? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
+ return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
}
/**
- * Whether the currently open marker closes stricly before the provided atom.
+ * Whether the currently open marker closes stricly before the provided row/RT.
*/
- public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
+ public boolean hasClosingMarkerBefore(Unfiltered unfiltered)
{
return !openTombstones.isEmpty()
- && metadata.comparator.compare(openTombstones.first().stop.bound,
atom.clustering()) < 0;
+ && metadata.comparator.compare(openTombstones.first().stop.bound,
unfiltered.clustering()) < 0;
}
/**
@@ -526,6 +603,11 @@ public abstract class UnfilteredDeserializer
{
return metadata.comparator.compare(close.stop.bound, open.start.bound) ==
0;
}
+
+ public void clearState()
+ {
+ openTombstones.clear();
+ }
}
}
}
|