cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r997498 - in /cassandra/trunk/src/java/org/apache/cassandra: dht/ service/ streaming/
Date Wed, 15 Sep 2010 20:48:40 GMT
Author: jbellis
Date: Wed Sep 15 20:48:40 2010
New Revision: 997498

URL: http://svn.apache.org/viewvc?rev=997498&view=rev
Log:
Add callbacks to streaming and replace one-off bootstrap-finished notification w/ streamin
callback

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Sep 15 20:48:40
2010
@@ -18,31 +18,33 @@
 
 package org.apache.cassandra.dht;
 
- import java.util.*;
- import java.util.concurrent.locks.Condition;
  import java.io.IOException;
  import java.io.UnsupportedEncodingException;
  import java.net.InetAddress;
+ import java.util.*;
+ import java.util.concurrent.locks.Condition;
 
+ import com.google.common.collect.ArrayListMultimap;
+ import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.Multimap;
+ import org.apache.commons.lang.ArrayUtils;
  import org.apache.commons.lang.StringUtils;
  import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+ import org.slf4j.LoggerFactory;
 
- import org.apache.commons.lang.ArrayUtils;
-
- import org.apache.cassandra.concurrent.Stage;
- import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IFailureDetector;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
- import org.apache.cassandra.net.*;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.IAsyncCallback;
+ import org.apache.cassandra.net.IVerbHandler;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.streaming.StreamIn;
- import org.apache.cassandra.utils.SimpleCondition;
  import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.gms.FailureDetector;
- import org.apache.cassandra.gms.IFailureDetector;
- import com.google.common.collect.Multimap;
- import com.google.common.collect.ArrayListMultimap;
+ import org.apache.cassandra.utils.SimpleCondition;
 
 
 public class BootStrapper
@@ -64,22 +66,49 @@ public class BootStrapper
         this.token = token;
         tokenMetadata = tmd;
     }
-    
+
     public void startBootstrap() throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Beginning bootstrap process");
+
+        final Multimap<InetAddress, String> bootstrapNodes = HashMultimap.create();
+        final Multimap<String, Map.Entry<InetAddress, Collection<Range>>>
rangesToFetch = HashMultimap.create();
+
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);
+            Map<InetAddress, Collection<Range>> workMap = getWorkMap(getRangesWithSources(table)).asMap();
+            for (Map.Entry<InetAddress, Collection<Range>> entry : workMap.entrySet())
+            {
+                bootstrapNodes.put(entry.getKey(), table);
+                rangesToFetch.put(table, entry);
+            }
+        }
+
+        for (final String table : rangesToFetch.keySet())
+        {
             /* Send messages to respective folks to stream data over to me */
-            for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
+            for (Map.Entry<InetAddress, Collection<Range>> entry : rangesToFetch.get(table))
             {
-                InetAddress source = entry.getKey();
-                StorageService.instance.addBootstrapSource(source, table);
+                final InetAddress source = entry.getKey();
+                final Runnable callback = new Runnable()
+                {
+                    public void run()
+                    {
+                        synchronized (bootstrapNodes)
+                        {
+                            bootstrapNodes.remove(source, table);
+                            if (logger.isDebugEnabled())
+                                logger.debug(String.format("Removed %s/%s as a bootstrap
source; remaining is [%s]",
+                                                           source, table, StringUtils.join(bootstrapNodes.keySet(),
", ")));
+                            if (bootstrapNodes.isEmpty())
+                                StorageService.instance.finishBootstrapping();
+                        }
+                    }
+                };
                 if (logger.isDebugEnabled())
-                    logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(),
", "));
-                StreamIn.requestRanges(source, table, entry.getValue());
+                    logger.debug("Bootstrapping from " + source + " ranges " + StringUtils.join(entry.getValue(),
", "));
+                StreamIn.requestRanges(source, table, entry.getValue(), callback);
             }
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Sep 15 20:48:40
2010
@@ -176,36 +176,13 @@ public class StorageService implements I
 
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
-    private Multimap<InetAddress, String> bootstrapSet;
     /* when intialized as a client, we shouldn't write to the system table. */
     private boolean isClientMode;
     private boolean initialized;
     private String operationMode;
     private MigrationManager migrationManager = new MigrationManager();
 
-    public void addBootstrapSource(InetAddress s, String table)
-    {
-        if (logger_.isDebugEnabled())
-            logger_.debug(String.format("Added %s/%s as a bootstrap source", s, table));
-        bootstrapSet.put(s, table);
-    }
-
-    public void removeBootstrapSource(InetAddress s, String table)
-    {
-        if (table == null)
-            bootstrapSet.removeAll(s);
-        else
-            bootstrapSet.remove(s, table);
-        if (logger_.isDebugEnabled())
-            logger_.debug(String.format("Removed %s/%s as a bootstrap source; remaining is
[%s]", s, table == null ? "<ALL>" : table, StringUtils.join(bootstrapSet.keySet(), ",
")));
-
-        if (bootstrapSet.isEmpty())
-        {
-            finishBootstrapping();
-        }
-    }
-
-    private void finishBootstrapping()
+    public void finishBootstrapping()
     {
         isBootstrapMode = false;
         SystemTable.setBootstrapped(true);
@@ -236,8 +213,6 @@ public class StorageService implements I
             throw new RuntimeException(e);
         }
 
-        bootstrapSet = Multimaps.synchronizedSetMultimap(HashMultimap.<InetAddress, String>create());
-
         /* register the verb handlers */
         MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
@@ -1513,7 +1488,7 @@ public class StorageService implements I
             }
 
             setMode("Leaving: streaming data to other nodes", true);
-            final Set<Map.Entry<Range, InetAddress>> pending = Collections.synchronizedSet(new
HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries()));
+            final Set<Map.Entry<Range, InetAddress>> pending = new HashSet<Map.Entry<Range,
InetAddress>>(rangesMM.entries());
             for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
             {
                 final Range range = entry.getKey();
@@ -1522,9 +1497,12 @@ public class StorageService implements I
                 {
                     public void run()
                     {
-                        pending.remove(entry);
-                        if (pending.isEmpty())
-                            latch.countDown();
+                        synchronized(pending)
+                        {
+                            pending.remove(entry);
+                            if (pending.isEmpty())
+                                latch.countDown();
+                        }
                     }
                 };
                 StageManager.getStage(Stage.STREAM).execute(new Runnable()

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed Sep 15 20:48:40
2010
@@ -47,11 +47,16 @@ public class StreamIn
      */
     public static void requestRanges(InetAddress source, String tableName, Collection<Range>
ranges)
     {
+        requestRanges(source, tableName, ranges, null);
+    }
+
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range>
ranges, Runnable callback)
+    {
         assert ranges.size() > 0;
 
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges,
", "));
-        StreamInSession session = StreamInSession.create(source);
+        StreamInSession session = StreamInSession.create(source, callback);
         Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges,
tableName, session.getSessionId()).makeMessage();
         MessagingService.instance.sendOneWay(message, source);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Sep 15
20:48:40 2010
@@ -42,16 +42,23 @@ public class StreamInSession
 
     private final List<PendingFile> pendingFiles = new ArrayList<PendingFile>();
     private final Pair<InetAddress, Long> context;
+    private final Runnable callback;
 
-    private StreamInSession(Pair<InetAddress, Long> context)
+    private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
         this.context = context;
+        this.callback = callback;
     }
 
     public static StreamInSession create(InetAddress host)
     {
+        return create(host, null);
+    }
+
+    public static StreamInSession create(InetAddress host, Runnable callback)
+    {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
-        StreamInSession session = new StreamInSession(context);
+        StreamInSession session = new StreamInSession(context, callback);
         sessions.put(context, session);
         return session;
     }
@@ -63,7 +70,7 @@ public class StreamInSession
         StreamInSession session = sessions.get(context);
         if (session == null)
         {
-            StreamInSession possibleNew = new StreamInSession(context);
+            StreamInSession possibleNew = new StreamInSession(context, null);
             if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
             {
                 session = possibleNew;
@@ -104,15 +111,15 @@ public class StreamInSession
             requestFile(pendingFiles.get(0));
         else
         {
-            if (StorageService.instance.isBootstrapMode())
-                StorageService.instance.removeBootstrapSource(getHost(), lastFile.desc.ksname);
-            remove();
+            close();
         }
     }
-    
-    public void remove()
+
+    public void close()
     {
         sessions.remove(context);
+        if (callback != null)
+            callback.run();
     }
 
     public void requestFile(PendingFile file)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Sep 15 20:48:40
2010
@@ -67,7 +67,7 @@ public class StreamOut
         assert ranges.size() > 0;
         
         // this is so that this target shows up as a destination while anticompaction is
happening.
-        StreamOutSession session = StreamOutSession.create(target);
+        StreamOutSession session = StreamOutSession.create(target, callback);
 
         logger.info("Beginning transfer process to {} for ranges {}", target, StringUtils.join(ranges,
", "));
 
@@ -85,8 +85,6 @@ public class StreamOut
         {
             session.close();
         }
-        if (callback != null)
-            callback.run();
     }
 
     /**
@@ -119,7 +117,7 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then stream them
to the target endpoint.
     */
-    public static void transferRangesForRequest(StreamOutSession session, String tableName,
Collection<Range> ranges, Runnable callback)
+    public static void transferRangesForRequest(StreamOutSession session, String tableName,
Collection<Range> ranges)
     {
         assert ranges.size() > 0;
 
@@ -135,9 +133,6 @@ public class StreamOut
         {
             throw new IOError(e);
         }
-
-        if (callback != null)
-            callback.run();
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Wed Sep
15 20:48:40 2010
@@ -35,21 +35,36 @@ import org.cliffc.high_scale_lib.NonBloc
  * This class manages the streaming of multiple files one after the other.
 */
 public class StreamOutSession
-{   
+{
     private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class
);
-        
+
     // one host may have multiple stream sessions.
     private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession>
streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
 
+    private final Map<String, PendingFile> files = new LinkedHashMap<String, PendingFile>();
+    private final Pair<InetAddress, Long> context;
+    private final Runnable callback;
+    private final SimpleCondition condition = new SimpleCondition();
+
     public static StreamOutSession create(InetAddress host)
     {
-        return create(host, System.nanoTime());
+        return create(host, System.nanoTime(), null);
+    }
+
+    public static StreamOutSession create(InetAddress host, Runnable callback)
+    {
+        return create(host, System.nanoTime(), callback);
     }
 
     public static StreamOutSession create(InetAddress host, long sessionId)
     {
+        return create(host, sessionId, null);
+    }
+
+    public static StreamOutSession create(InetAddress host, long sessionId, Runnable callback)
+    {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
-        StreamOutSession session = new StreamOutSession(context);
+        StreamOutSession session = new StreamOutSession(context, callback);
         streams.put(context, session);
         return session;
     }
@@ -62,16 +77,14 @@ public class StreamOutSession
     public void close()
     {
         streams.remove(context);
+        if(callback != null) 
+            callback.run();
     }
 
-    private final Map<String, PendingFile> files = new LinkedHashMap<String, PendingFile>();
-    
-    private final Pair<InetAddress, Long> context;
-    private final SimpleCondition condition = new SimpleCondition();
-    
-    private StreamOutSession(Pair<InetAddress, Long> context)
+    private StreamOutSession(Pair<InetAddress, Long> context, Runnable cb)
     {
         this.context = context;
+        this.callback = cb;
     }
 
     public InetAddress getHost()

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Wed Sep 15 20:48:40 2010
@@ -63,7 +63,7 @@ public class StreamRequestVerbHandler im
             {
                 // range request.
                 StreamOutSession session = StreamOutSession.create(message.getFrom(), srm.sessionId);
-                StreamOut.transferRangesForRequest(session, srm.table, srm.ranges, null);
+                StreamOut.transferRangesForRequest(session, srm.table, srm.ranges);
             }
         }
         catch (IOException ex)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997498&r1=997497&r2=997498&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java Wed
Sep 15 20:48:40 2010
@@ -58,9 +58,7 @@ public class StreamStatusVerbHandler imp
                     break;
                 case EMPTY:
                     logger.error("Did not find matching ranges on {}", message.getFrom());
-                    StreamInSession.get(message.getFrom(), streamStatus.getSessionId()).remove();
-                    if (StorageService.instance.isBootstrapMode())
-                        StorageService.instance.removeBootstrapSource(message.getFrom(),
new String(message.getHeader(StreamOut.TABLE_NAME)));
+                    StreamInSession.get(message.getFrom(), streamStatus.getSessionId()).close();
                     break;
                 default:
                     throw new RuntimeException("Cannot handle FileStatus.Action: " + streamStatus.getAction());



Mime
View raw message