Author: jbellis
Date: Tue Oct 19 17:20:45 2010
New Revision: 1024327
URL: http://svn.apache.org/viewvc?rev=1024327&view=rev
Log:
avoid deserializing QUORUM responses twice. patch by jbellis; tested by Wayne for CASSANDRA-1622
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -37,4 +37,5 @@ public interface IResponseResolver<T> {
public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
public boolean isDataPresent(Collection<Message> responses);
+ public void preprocess(Message message);
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Tue Oct 19 17:20:45 2010
@@ -89,6 +89,7 @@ public class QuorumResponseHandler<T> im
public void response(Message message)
{
responses.add(message);
+ responseResolver.preprocess(message);
if (responseResolver.isDataPresent(responses))
{
condition.signal();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -108,6 +108,10 @@ public class RangeSliceResponseResolver
return resolvedRows;
}
+ public void preprocess(Message message)
+ {
+ }
+
public boolean isDataPresent(Collection<Message> responses)
{
return responses.size() >= sources.size();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -22,10 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadResponse;
@@ -37,6 +34,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.log4j.Logger;
@@ -49,6 +47,7 @@ public class ReadResponseResolver implem
private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
private final String table;
private final int responseCount;
+ private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message,
ReadResponse>();
public ReadResponseResolver(String table, int responseCount)
{
@@ -84,11 +83,11 @@ public class ReadResponseResolver implem
* query exists then we need to compare the digest with
* the digest of the data that is received.
*/
- for (Message response : responses)
- {
- byte[] body = response.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ for (Message message : responses)
+ {
+ ReadResponse result = results.get(message);
+ if (result == null)
+ continue; // arrived after quorum already achieved
if (result.isDigestQuery())
{
digest = result.digest();
@@ -97,14 +96,11 @@ public class ReadResponseResolver implem
else
{
versions.add(result.row().cf);
- endPoints.add(response.getFrom());
+ endPoints.add(message.getFrom());
key = result.row().key;
}
}
- if (logger_.isDebugEnabled())
- logger_.debug("responses deserialized");
-
// If there was a digest query compare it with all the data digests
// If there is a mismatch then throw an exception so that read repair can happen.
if (isDigestQuery)
@@ -190,30 +186,36 @@ public class ReadResponseResolver implem
return resolved;
}
- public boolean isDataPresent(Collection<Message> responses)
- {
- if (responses.size() < responseCount)
- return false;
-
- boolean isDataPresent = false;
- for (Message response : responses)
+ public void preprocess(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ try
{
- byte[] body = response.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- try
- {
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (!result.isDigestQuery())
- {
- isDataPresent = true;
- }
- bufIn.close();
- }
- catch (IOException ex)
- {
- throw new RuntimeException(ex);
- }
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ results.put(message, result);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
}
- return isDataPresent;
}
+
+ public boolean isDataPresent(Collection<Message> responses)
+ {
+ int digests = 0;
+ int data = 0;
+ for (Message message : responses)
+ {
+ ReadResponse result = results.get(message);
+ if (result == null)
+ continue; // arrived concurrently
+ if (result.isDigestQuery())
+ digests++;
+ else
+ data++;
+ }
+ return data > 0 && (data + digests >= responseCount);
+ }
+
}
|