cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r816453 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java CommitLog.java Table.java
Date Fri, 18 Sep 2009 03:20:58 GMT
Author: junrao
Date: Fri Sep 18 03:20:57 2009
New Revision: 816453

URL: http://svn.apache.org/viewvc?rev=816453&view=rev
Log:
commitlog may consider writes flushed, that are not yet; patched by junrao; reviewed by jbellis
for CASSANDRA-445

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Sep 18 03:20:57 2009
@@ -322,11 +322,28 @@
                              columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
     }
 
-    void switchMemtable(Memtable oldMemtable, CommitLog.CommitLogContext ctx)
+    void switchMemtable(Memtable oldMemtable)
     {
-        memtableLock_.writeLock().lock();
+        CommitLog.CommitLogContext ctx = null;
+        /**
+         *  If we can get the writelock, that means no new updates can come in and 
+         *  all ongoing updates to memtables have completed. We can get the tail
+         *  of the log and use it as the starting position for log replay on recovery.
+         *  
+         *  By holding the flusherLock_, we don't need the memetableLock any more.
+         */
+        Table.flusherLock_.writeLock().lock();
         try
         {
+            try
+            {
+                ctx = CommitLog.open().getContext();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+
             if (oldMemtable.isFrozen())
             {
                 return;
@@ -336,17 +353,17 @@
             getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); // it's ok for
the MT to briefly be both active and pendingFlush
             submitFlush(oldMemtable, ctx);
             memtable_ = new Memtable(table_, columnFamily_);
+
+            if (memtableSwitchCount == Integer.MAX_VALUE)
+            {
+                memtableSwitchCount = 0;
+            }
+            memtableSwitchCount++;
         }
         finally
         {
-            memtableLock_.writeLock().unlock();
-        }
-
-        if (memtableSwitchCount == Integer.MAX_VALUE)
-        {
-            memtableSwitchCount = 0;
+            Table.flusherLock_.writeLock().unlock();
         }
-        memtableSwitchCount++;
     }
 
     void switchBinaryMemtable(String key, byte[] buffer) throws IOException
@@ -360,16 +377,7 @@
         if (memtable_.isClean())
             return;
 
-        CommitLog.CommitLogContext ctx = null;
-        try
-        {
-            ctx = CommitLog.open().getContext();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        switchMemtable(memtable_, ctx);
+        switchMemtable(memtable_);
     }
 
     void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
@@ -402,25 +410,30 @@
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
-    void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
+    Memtable apply(String key, ColumnFamily columnFamily)
             throws IOException
     {
         long start = System.currentTimeMillis();
         Memtable initialMemtable = getMemtableThreadSafe();
+        boolean isFlush = false;
+        
         if (initialMemtable.isThresholdViolated())
         {
-            switchMemtable(initialMemtable, cLogCtx);
+            isFlush = true;
         }
+        
         memtableLock_.writeLock().lock();
         try
         {
-            memtable_.put(key, columnFamily);
+            initialMemtable.put(key, columnFamily);
         }
         finally
         {
             memtableLock_.writeLock().unlock();
         }
         writeStats_.add(System.currentTimeMillis() - start);
+        
+        return isFlush ? initialMemtable : null;
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Fri Sep 18 03:20:57
2009
@@ -268,10 +268,11 @@
         logWriter_.seek(currentPos);
     }
 
-    private static void writeCommitLogHeader(RandomAccessFile logWriter, byte[] bytes) throws
IOException
+    private static void writeCommitLogHeader(BufferedRandomAccessFile logWriter, byte[] bytes)
throws IOException
     {
         logWriter.writeLong(bytes.length);
         logWriter.write(bytes);
+        logWriter.sync();
     }
 
     void recover(File[] clogs) throws IOException
@@ -515,7 +516,7 @@
                 }
                 else
                 {
-                    RandomAccessFile logWriter = CommitLog.createWriter(oldFile);
+                    BufferedRandomAccessFile logWriter = CommitLog.createWriter(oldFile);
                     writeCommitLogHeader(logWriter, oldCommitLogHeader.toByteArray());
                     logWriter.close();
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Sep 18 03:20:57
2009
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
@@ -48,6 +49,8 @@
 
     private static Logger logger_ = Logger.getLogger(Table.class);
     private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
+    /* we use this lock to drain updaters before calling a flush. */
+    static final ReentrantReadWriteLock flusherLock_ = new ReentrantReadWriteLock(true);
 
     /*
      * This class represents the metadata of this Table. The metadata
@@ -590,12 +593,30 @@
     */
     void apply(Row row) throws IOException
     {
-        CommitLog.CommitLogContext cLogCtx = CommitLog.open().add(row);
+        HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore,
Memtable>();
 
-        for (ColumnFamily columnFamily : row.getColumnFamilies())
+        flusherLock_.readLock().lock();
+        try
         {
-            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-            cfStore.apply(row.key(), columnFamily, cLogCtx);
+            CommitLog.open().add(row);
+        
+            for (ColumnFamily columnFamily : row.getColumnFamilies())
+            {
+                Memtable memtableToFlush;
+                ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+                if ( (memtableToFlush=cfStore.apply(row.key(), columnFamily)) != null)
+                    memtablesToFlush.put(cfStore, memtableToFlush);
+            }
+        }
+        finally
+        {
+            flusherLock_.readLock().unlock();
+        }
+        
+        if (memtablesToFlush.size() > 0)
+        {
+            for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet())
+                entry.getKey().switchMemtable(entry.getValue());
         }
     }
 



Mime
View raw message