Author: jbellis
Date: Wed Jun 30 05:48:24 2010
New Revision: 959197
URL: http://svn.apache.org/viewvc?rev=959197&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/.rat-excludes
cassandra/trunk/CHANGES.txt
cassandra/trunk/build.xml
cassandra/trunk/debian/changelog
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props
changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-958101
+/cassandra/branches/cassandra-0.6:922689-958811
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Modified: cassandra/trunk/.rat-excludes
URL: http://svn.apache.org/viewvc/cassandra/trunk/.rat-excludes?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/.rat-excludes (original)
+++ cassandra/trunk/.rat-excludes Wed Jun 30 05:48:24 2010
@@ -12,3 +12,5 @@ src/gen-java/**
build/**
lib/licenses/*.txt
.settings/**
+contrib/pig/example-script.pig
+contrib/redhat/cassandra
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jun 30 05:48:24 2010
@@ -39,9 +39,13 @@ dev
* allow multiple repair sessions per node (CASSANDRA-1190)
+0.6.4
+ * avoid queuing multiple hint deliveries for the same endpoint
+ (CASSANDRA-1229)
+
+
0.6.3
* retry to make streaming connections up to 8 times. (CASSANDRA-1019)
- * fix potential for duplicate rows seen by Hadoop jobs (CASSANDRA-1042)
* reject describe_ring() calls on invalid keyspaces (CASSANDRA-1111)
* don't reject reads at CL.ALL (CASSANDRA-1152)
* reject deletions to supercolumns in CFs containing only standard
@@ -65,6 +69,8 @@ dev
* remove opportunistic repairs, when two machines with overlapping replica
responsibilities happen to finish major compactions of the same CF near
the same time. repairs are now fully manual (CASSANDRA-1190)
+ * add ability to lower compaction priority (default is no change from 0.6.2)
+ (CASSANDRA-1181)
0.6.2
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Jun 30 05:48:24 2010
@@ -45,7 +45,7 @@
<property name="test.unit.src" value="${test.dir}/unit"/>
<property name="test.long.src" value="${test.dir}/long"/>
<property name="dist.dir" value="${build.dir}/dist"/>
- <property name="version" value="0.6.2"/>
+ <property name="version" value="0.6.3"/>
<property name="final.name" value="${ant.project.name}-${version}"/>
<property name="ivy.version" value="2.1.0" />
<property name="ivy.url"
Modified: cassandra/trunk/debian/changelog
URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/changelog?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/debian/changelog (original)
+++ cassandra/trunk/debian/changelog Wed Jun 30 05:48:24 2010
@@ -1,3 +1,9 @@
+cassandra (0.6.3) unstable; urgency=low
+
+ * New stable point release.
+
+ -- Eric Evans <eevans@apache.org> Fri, 25 Jun 2010 17:18:54 -0500
+
cassandra (0.6.2) unstable; urgency=low
* New stable point release.
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958811
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958811
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958811
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958811
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 30 05:48:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958101
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958811
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Wed Jun 30 05:48:24 2010
@@ -30,9 +30,9 @@ public class DebuggableThreadPoolExecuto
{
protected static Logger logger = LoggerFactory.getLogger(JMXEnabledThreadPoolExecutor.class);
- public DebuggableThreadPoolExecutor(String threadPoolName)
+ public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName));
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName, priority));
}
public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java Wed Jun
30 05:48:24 2010
@@ -30,16 +30,26 @@ import java.util.concurrent.atomic.*;
public class NamedThreadFactory implements ThreadFactory
{
protected final String id;
+ private final int priority;
protected final AtomicInteger n = new AtomicInteger(1);
public NamedThreadFactory(String id)
{
+ this(id, Thread.NORM_PRIORITY);
+ }
+
+ public NamedThreadFactory(String id, int priority)
+ {
+
this.id = id;
+ this.priority = priority;
}
public Thread newThread(Runnable runnable)
{
String name = id + ":" + n.getAndIncrement();
- return new Thread(runnable, name);
+ Thread thread = new Thread(runnable, name);
+ thread.setPriority(priority);
+ return thread;
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Jun 30 05:48:24
2010
@@ -619,7 +619,9 @@ public class CompactionManager implement
public CompactionExecutor()
{
- super("CompactionExecutor");
+ super("CompactionExecutor", System.getProperty("cassandra.compaction.priority")
== null
+ ? Thread.NORM_PRIORITY
+ : Integer.parseInt(System.getProperty("cassandra.compaction.priority")));
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jun 30
05:48:24 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
@@ -45,6 +46,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.utils.FBUtilities.UTF8;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**
@@ -84,6 +86,8 @@ public class HintedHandOffManager
private static final int PAGE_SIZE = 10000;
private static final String SEPARATOR = "-";
+ private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
+
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName,
byte[] key) throws IOException
@@ -170,9 +174,10 @@ public class HintedHandOffManager
}
- private static void deliverHintsToEndpoint(InetAddress endpoint) throws IOException,
DigestMismatchException, InvalidRequestException, TimeoutException
+ private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException,
InvalidRequestException, TimeoutException
{
logger_.info("Started hinted handoff for endpoint " + endpoint);
+ queuedDeliveries.remove(endpoint);
// 1. Get the key of the endpoint we need to handoff
// 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
@@ -263,6 +268,9 @@ public class HintedHandOffManager
*/
public void deliverHints(final InetAddress to)
{
+ if (!queuedDeliveries.add(to))
+ return;
+
Runnable r = new WrappedRunnable()
{
public void runMayThrow() throws Exception
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Wed
Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.gms;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Wed
Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.gms;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Wed
Jun 30 05:48:24 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.gms;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IVerbHandler;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jun 30 05:48:24
2010
@@ -501,22 +501,24 @@ public class StorageProxy implements Sto
long startTime = System.nanoTime();
final String table = command.keyspace;
- List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range,
command.keyspace);
+ List<AbstractBounds> ranges = getRestrictedRanges(command.range);
// now scan until we have enough results
List<Row> rows = new ArrayList<Row>(command.max_keys);
- for (Pair<AbstractBounds, List<InetAddress>> pair : getRangeIterator(ranges,
command.range.left))
+ for (AbstractBounds range : getRangeIterator(ranges, command.range.left))
{
- AbstractBounds range = pair.left;
- List<InetAddress> endpoints = pair.right;
+ List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace,
range.right);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
+
RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family,
command.super_column, command.predicate, range, command.max_keys);
Message message = c2.getMessage();
// collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace,
endpoints);
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace,
liveEndpoints);
AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver,
consistency_level, table);
- for (InetAddress endpoint : endpoints)
+ // TODO bail early if live endpoints can't satisfy requested consistency level
+ for (InetAddress endpoint : liveEndpoints)
{
MessagingService.instance.sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
@@ -621,30 +623,30 @@ public class StorageProxy implements Sto
/**
* returns an iterator that will return ranges in ring order, starting with the one that
contains the start token
*/
- private static Iterable<Pair<AbstractBounds, List<InetAddress>>> getRangeIterator(final
List<Pair<AbstractBounds, List<InetAddress>>> ranges, Token start)
+ private static Iterable<AbstractBounds> getRangeIterator(final List<AbstractBounds>
ranges, Token start)
{
// find the one to start with
int i;
for (i = 0; i < ranges.size(); i++)
{
- AbstractBounds range = ranges.get(i).left;
+ AbstractBounds range = ranges.get(i);
if (range.contains(start) || range.left.equals(start))
break;
}
- AbstractBounds range = ranges.get(i).left;
+ AbstractBounds range = ranges.get(i);
assert range.contains(start) || range.left.equals(start); // make sure the loop didn't
just end b/c ranges were exhausted
// return an iterable that starts w/ the correct range and iterates the rest in ring
order
final int begin = i;
- return new Iterable<Pair<AbstractBounds, List<InetAddress>>>()
+ return new Iterable<AbstractBounds>()
{
- public Iterator<Pair<AbstractBounds, List<InetAddress>>> iterator()
+ public Iterator<AbstractBounds> iterator()
{
- return new AbstractIterator<Pair<AbstractBounds, List<InetAddress>>>()
+ return new AbstractIterator<AbstractBounds>()
{
int n = 0;
- protected Pair<AbstractBounds, List<InetAddress>> computeNext()
+ protected AbstractBounds computeNext()
{
if (n == ranges.size())
return endOfData();
@@ -669,30 +671,38 @@ public class StorageProxy implements Sto
* D, but we don't want any other results from it until after the (D, T] range.
Unwrapping so that
* the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
*/
- private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds
queryRange, String keyspace)
- throws UnavailableException
+ private static List<AbstractBounds> getRestrictedRanges(AbstractBounds queryRange)
{
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
- Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(),
queryRange.left);
- List<Pair<AbstractBounds, List<InetAddress>>> ranges = new ArrayList<Pair<AbstractBounds,
List<InetAddress>>>();
- while (iter.hasNext())
+
+ List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
+ // for each node, compute its intersection with the query range, and add its unwrapped
components to our list
+ for (Token nodeToken : tokenMetadata.sortedTokens())
{
- Token nodeToken = iter.next();
Range nodeRange = new Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace,
nodeToken);
-
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
- Set<AbstractBounds> restrictedRanges = queryRange.restrictTo(nodeRange);
- for (AbstractBounds range : restrictedRanges)
+ for (AbstractBounds range : queryRange.restrictTo(nodeRange))
{
for (AbstractBounds unwrapped : range.unwrap())
{
if (logger.isDebugEnabled())
logger.debug("Adding to restricted ranges " + unwrapped + " for "
+ nodeRange);
- ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped,
endpoints));
+ ranges.add(unwrapped);
}
}
}
+
+ // re-sort ranges in ring order, post-unwrapping
+ Comparator<AbstractBounds> comparator = new Comparator<AbstractBounds>()
+ {
+ public int compare(AbstractBounds o1, AbstractBounds o2)
+ {
+ // no restricted ranges will overlap so we don't need to worry about inclusive
vs exclusive left,
+ // just sort by raw token position.
+ return o1.left.compareTo(o2.left);
+ }
+ };
+ Collections.sort(ranges, comparator);
+
return ranges;
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java?rev=959197&r1=959196&r2=959197&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/BloomFilterTrackerTest.java Wed Jun
30 05:48:24 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.junit.Test;
|