kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: KUDU-2055 [part 3]: Refactor BlockCreationTransaction and BlockDeletionTransaction
Date Thu, 12 Oct 2017 00:02:18 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 3e8f1af30 -> 0174c2678


KUDU-2055 [part 3]: Refactor BlockCreationTransaction and BlockDeletionTransaction

This patch refactors both BlockCreationTransaction and
BlockDeletionTransaction to be created from block manager, so that
they can be extended for specific implementations. Moreover,
we may be able to completely remove BlockManager's CreateBlock() and
DeleteBlock() to simplify the code paths in future.

Change-Id: I60c7d437061f98ad27b9aecda5fa89e909fb2ec6
Reviewed-on: http://gerrit.cloudera.org:8080/8144
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0174c267
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0174c267
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0174c267

Branch: refs/heads/master
Commit: 0174c2678c632809778daaeaf5566b1d91650059
Parents: 3e8f1af
Author: hahao <hao.hao@cloudera.com>
Authored: Tue Oct 10 15:31:50 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Thu Oct 12 00:01:57 2017 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile.cc                 |   8 +-
 src/kudu/cfile/cfile-test.cc                |  11 +-
 src/kudu/cfile/cfile_writer.cc              |   8 +-
 src/kudu/fs/block_manager-stress-test.cc    |  11 +-
 src/kudu/fs/block_manager-test.cc           |  37 ++++---
 src/kudu/fs/block_manager.h                 |  85 +++++----------
 src/kudu/fs/file_block_manager.cc           | 121 ++++++++++++++++++---
 src/kudu/fs/file_block_manager.h            |   4 +-
 src/kudu/fs/log_block_manager-test.cc       |  28 ++---
 src/kudu/fs/log_block_manager.cc            | 131 +++++++++++++++++++----
 src/kudu/fs/log_block_manager.h             |   4 +-
 src/kudu/tablet/deltafile.cc                |   8 +-
 src/kudu/tablet/diskrowset.cc               |  23 ++--
 src/kudu/tablet/diskrowset.h                |   7 +-
 src/kudu/tablet/multi_column_writer.cc      |   8 +-
 src/kudu/tserver/tablet_copy_client-test.cc |   6 +-
 src/kudu/tserver/tablet_copy_client.cc      |  10 +-
 src/kudu/tserver/tablet_copy_client.h       |   7 +-
 18 files changed, 340 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/cfile/bloomfile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index cb08687..fdbaad7 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -58,6 +58,7 @@ namespace kudu {
 namespace cfile {
 
 using fs::BlockCreationTransaction;
+using fs::BlockManager;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 
@@ -113,9 +114,10 @@ Status BloomFileWriter::Start() {
 }
 
 Status BloomFileWriter::Finish() {
-  BlockCreationTransaction transaction(writer_->block()->block_manager());
-  RETURN_NOT_OK(FinishAndReleaseBlock(&transaction));
-  return transaction.CommitCreatedBlocks();
+  BlockManager* bm = writer_->block()->block_manager();
+  unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  RETURN_NOT_OK(FinishAndReleaseBlock(transaction.get()));
+  return transaction->CommitCreatedBlocks();
 }
 
 Status BloomFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index d30787d..364fb93 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -94,6 +94,7 @@ using strings::Substitute;
 namespace kudu {
 namespace cfile {
 
+using fs::BlockManager;
 using fs::CountingReadableBlock;
 using fs::ReadableBlock;
 using fs::WritableBlock;
@@ -934,12 +935,10 @@ TEST_P(TestCFileBothCacheTypes, TestReleaseBlock) {
   WriterOptions opts;
   CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
   ASSERT_OK(w.Start());
-  fs::BlockCreationTransaction transaction(fs_manager_->block_manager());
-  ASSERT_OK(w.FinishAndReleaseBlock(&transaction));
-  ASSERT_EQ(1, transaction.created_blocks().size());
-  ASSERT_EQ(WritableBlock::FINALIZED, transaction.created_blocks()[0]->state());
-  ASSERT_OK(transaction.CommitCreatedBlocks());
-  ASSERT_EQ(0, transaction.created_blocks().size());
+  BlockManager* bm = fs_manager_->block_manager();
+  unique_ptr<fs::BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  ASSERT_OK(w.FinishAndReleaseBlock(transaction.get()));
+  ASSERT_OK(transaction->CommitCreatedBlocks());
 }
 
 TEST_P(TestCFileBothCacheTypes, TestLazyInit) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/cfile/cfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 9dd30ac..496317b 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -60,6 +60,7 @@ TAG_FLAG(cfile_write_checksums, evolving);
 
 using google::protobuf::RepeatedPtrField;
 using kudu::fs::BlockCreationTransaction;
+using kudu::fs::BlockManager;
 using kudu::fs::WritableBlock;
 using std::accumulate;
 using std::pair;
@@ -203,9 +204,10 @@ Status CFileWriter::Start() {
 
 Status CFileWriter::Finish() {
   TRACE_EVENT0("cfile", "CFileWriter::Finish");
-  BlockCreationTransaction transaction(block_->block_manager());
-  RETURN_NOT_OK(FinishAndReleaseBlock(&transaction));
-  return transaction.CommitCreatedBlocks();
+  BlockManager* bm = block_->block_manager();
+  unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  RETURN_NOT_OK(FinishAndReleaseBlock(transaction.get()));
+  return transaction->CommitCreatedBlocks();
 }
 
 Status CFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 80d4f2f..53c10d7 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -303,7 +303,9 @@ void BlockManagerStressTest<T>::WriterThread() {
   size_t num_bytes_written = 0;
   MonoDelta tight_loop(MonoDelta::FromSeconds(0));
   while (!ShouldStop(tight_loop)) {
-    vector<unique_ptr<WritableBlock>> all_dirty_blocks;
+    unique_ptr<BlockCreationTransaction> creation_transaction =
+        bm_->NewCreationTransaction();
+    vector<BlockId> all_dirty_blocks;
     for (int i = 0; i < FLAGS_block_group_number; i++) {
       vector<unique_ptr<WritableBlock>> dirty_blocks;
       vector<Random> dirty_block_rands;
@@ -351,17 +353,18 @@ void BlockManagerStressTest<T>::WriterThread() {
       num_bytes_written += total_dirty_bytes;
 
       for (auto& dirty_block : dirty_blocks) {
-        all_dirty_blocks.emplace_back(std::move(dirty_block));
+        all_dirty_blocks.emplace_back(dirty_block->id());
+        creation_transaction->AddCreatedBlock(std::move(dirty_block));
       }
     }
 
     // Close all dirty blocks.
-    CHECK_OK(bm_->CloseBlocks(all_dirty_blocks));
+    CHECK_OK(creation_transaction->CommitCreatedBlocks());
     // Publish the now sync'ed blocks to readers and deleters.
     {
       std::lock_guard<simple_spinlock> l(lock_);
       for (const auto& block : all_dirty_blocks) {
-        InsertOrDie(&written_blocks_, block->id(), 0);
+        InsertOrDie(&written_blocks_, block, 0);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 34a102c..552a533 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -132,14 +132,14 @@ class BlockManagerTest : public KuduTest {
     int num_blocks = num_dirs * num_blocks_per_dir;
 
     // Write 'num_blocks' blocks to this data dir group.
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < num_blocks; i++) {
       unique_ptr<WritableBlock> written_block;
       ASSERT_OK(bm_->CreateBlock(opts, &written_block));
       ASSERT_OK(written_block->Append(kTestData));
-      transaction.AddCreatedBlock(std::move(written_block));
+      transaction->AddCreatedBlock(std::move(written_block));
     }
-    ASSERT_OK(transaction.CommitCreatedBlocks());
+    ASSERT_OK(transaction->CommitCreatedBlocks());
   }
 
  protected:
@@ -349,15 +349,15 @@ void BlockManagerTest<LogBlockManager>::RunMultipathTest(const
vector<string>& p
   ASSERT_OK(dd_manager_->CreateDataDirGroup("multipath_test"));
 
   const char* kTestData = "test data";
-  BlockCreationTransaction transaction(bm_.get());
+  unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
   // Creates (numPaths * 2) containers.
   for (int j = 0; j < paths.size() * 2; j++) {
     unique_ptr<WritableBlock> block;
     ASSERT_OK(bm_->CreateBlock(opts, &block));
     ASSERT_OK(block->Append(kTestData));
-    transaction.AddCreatedBlock(std::move(block));
+    transaction->AddCreatedBlock(std::move(block));
   }
-  ASSERT_OK(transaction.CommitCreatedBlocks());
+  ASSERT_OK(transaction->CommitCreatedBlocks());
 
   // Verify the results. (numPaths * 2) containers were created, each
   // consisting of 2 files. Thus, there should be a total of
@@ -529,7 +529,8 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
   }
 
   Random rand(SeedRandom());
-  vector<unique_ptr<WritableBlock>> dirty_blocks;
+  unique_ptr<BlockCreationTransaction> creation_transaction =
+      this->bm_->NewCreationTransaction();
   LOG_TIMING(INFO, Substitute("creating $0 blocks", kNumBlocks)) {
     for (int i = 0; i < kNumBlocks; i++) {
       // Create a block.
@@ -543,12 +544,12 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
       }
       written_block->Append(Slice(data, sizeof(data)));
       written_block->Finalize();
-      dirty_blocks.emplace_back(std::move(written_block));
+      creation_transaction->AddCreatedBlock(std::move(written_block));
     }
   }
 
   LOG_TIMING(INFO, Substitute("closing $0 blocks", kNumBlocks)) {
-    ASSERT_OK(this->bm_->CloseBlocks(dirty_blocks));
+    ASSERT_OK(creation_transaction->CommitCreatedBlocks());
   }
 }
 
@@ -1065,7 +1066,8 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   // Create a BlockCreationTransaction. In this transaction,
   // create some blocks and commit the writes all together.
   const string kTestData = "test data";
-  BlockCreationTransaction creation_transaction(this->bm_.get());
+  unique_ptr<BlockCreationTransaction> creation_transaction =
+      this->bm_->NewCreationTransaction();
   vector<BlockId> created_blocks;
   for (int i = 0; i < 20; i++) {
     unique_ptr<WritableBlock> writer;
@@ -1075,9 +1077,9 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
     ASSERT_OK(writer->Append(kTestData));
     ASSERT_OK(writer->Finalize());
     created_blocks.emplace_back(writer->id());
-    creation_transaction.AddCreatedBlock(std::move(writer));
+    creation_transaction->AddCreatedBlock(std::move(writer));
   }
-  ASSERT_OK(creation_transaction.CommitCreatedBlocks());
+  ASSERT_OK(creation_transaction->CommitCreatedBlocks());
 
   // Read the blocks and verify the content.
   for (const auto& block : created_blocks) {
@@ -1094,12 +1096,13 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
 
   // Create a BlockDeletionTransaction. In this transaction,
   // randomly delete almost half of the created blocks.
-  BlockDeletionTransaction deletion_transaction(this->bm_.get());
+  shared_ptr<BlockDeletionTransaction> deletion_transaction =
+      this->bm_->NewDeletionTransaction();
   for (const auto& block : created_blocks) {
-    if (rand() % 2) deletion_transaction.AddDeletedBlock(block);
+    if (rand() % 2) deletion_transaction->AddDeletedBlock(block);
   }
   vector<BlockId> deleted_blocks;
-  ASSERT_OK(deletion_transaction.CommitDeletedBlocks(&deleted_blocks));
+  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted_blocks));
   for (const auto& block : deleted_blocks) {
     created_blocks.erase(std::remove(created_blocks.begin(), created_blocks.end(), block),
                          created_blocks.end());
@@ -1111,10 +1114,10 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   FLAGS_crash_on_eio = false;
   FLAGS_env_inject_eio = 1.0;
   for (const auto& block : created_blocks) {
-    deletion_transaction.AddDeletedBlock(block);
+    deletion_transaction->AddDeletedBlock(block);
   }
   deleted_blocks.clear();
-  Status s = deletion_transaction.CommitDeletedBlocks(&deleted_blocks);
+  Status s = deletion_transaction->CommitDeletedBlocks(&deleted_blocks);
   ASSERT_TRUE(s.IsIOError());
   ASSERT_STR_CONTAINS(s.ToString(), Substitute("only deleted $0 blocks, "
                                                "first failure",

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 991b4ed..641479e 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -43,6 +43,8 @@ class ArrayView;
 
 namespace fs {
 
+class BlockCreationTransaction;
+class BlockDeletionTransaction;
 class BlockManager;
 class FsErrorManager;
 struct FsReport;
@@ -253,12 +255,14 @@ class BlockManager {
   // writer is closed.
   virtual Status DeleteBlock(const BlockId& block_id) = 0;
 
-  // Closes (and fully synchronizes) the given blocks. Effectively like
-  // Close() for each block but may be optimized for groups of blocks.
-  //
-  // On success, guarantees that outstanding data is durable.
-  virtual Status CloseBlocks(
-      const std::vector<std::unique_ptr<WritableBlock>>& blocks) = 0;
+  // Constructs a block creation transaction to group a set of block creation
+  // operations and closes the registered blocks together.
+  virtual std::unique_ptr<BlockCreationTransaction> NewCreationTransaction() = 0;
+
+  // Constructs a block deletion transaction to group a set of block deletion
+  // operations. Similar to 'DeleteBlock', the actual deletion will take place
+  // after the last open reader or writer is closed.
+  virtual std::shared_ptr<BlockDeletionTransaction> NewDeletionTransaction() = 0;
 
   // Retrieves the IDs of all blocks under management by this block manager.
   // These include ReadableBlocks as well as WritableBlocks.
@@ -278,36 +282,19 @@ class BlockManager {
 //  1) the underlying block manager can optimize synchronization for
 //     a batch of blocks if possible to achieve better performance.
 //  2) to be able to track all blocks created in one logical operation.
+// This class is not thread-safe. It is not recommended to share a transaction
+// between threads. If necessary, use external synchronization to guarantee
+// thread safety.
 class BlockCreationTransaction {
  public:
-  explicit BlockCreationTransaction(BlockManager* bm)
-      : bm_(bm) {
-  }
+  virtual ~BlockCreationTransaction() = default;
 
-  void AddCreatedBlock(std::unique_ptr<WritableBlock> block) {
-    created_blocks_.emplace_back(std::move(block));
-  }
+  // Add a block to the creation transaction.
+  virtual void AddCreatedBlock(std::unique_ptr<WritableBlock> block) = 0;
 
   // Commit all the created blocks and close them together.
   // On success, guarantees that outstanding data is durable.
-  Status CommitCreatedBlocks() {
-    if (created_blocks_.empty()) {
-      return Status::OK();
-    }
-
-    Status s = bm_->CloseBlocks(created_blocks_);
-    if (s.ok()) created_blocks_.clear();
-    return s;
-  }
-
-  const std::vector<std::unique_ptr<WritableBlock>>& created_blocks() const
{
-    return created_blocks_;
-  }
-
- private:
-  // The owning BlockManager. Must outlive the BlockCreationTransaction.
-  BlockManager* bm_;
-  std::vector<std::unique_ptr<WritableBlock>> created_blocks_;
+  virtual Status CommitCreatedBlocks() = 0;
 };
 
 // Group a set of block deletions together in a transaction. Similar to
@@ -315,15 +302,15 @@ class BlockCreationTransaction {
 //  1) the underlying block manager can optimize deletions for a batch
 //     of blocks if possible to achieve better performance.
 //  2) to be able to track all blocks deleted in one logical operation.
+// This class is not thread-safe. It is not recommended to share a transaction
+// between threads. If necessary, use external synchronization to guarantee
+// thread safety.
 class BlockDeletionTransaction {
  public:
-  explicit BlockDeletionTransaction(BlockManager* bm)
-      : bm_(bm) {
-  }
+  virtual ~BlockDeletionTransaction() = default;
 
-  void AddDeletedBlock(BlockId block) {
-    deleted_blocks_.emplace_back(block);
-  }
+  // Add a block to the deletion transaction.
+  virtual void AddDeletedBlock(BlockId block) = 0;
 
   // Deletes a group of blocks given the block IDs, the actual deletion will take
   // place after the last open reader or writer is closed for each block that needs
@@ -332,31 +319,7 @@ class BlockDeletionTransaction {
   // is OK or error.
   //
   // Returns the first deletion failure that was seen, if any.
-  Status CommitDeletedBlocks(std::vector<BlockId>* deleted) {
-    Status first_failure;
-    for (BlockId block : deleted_blocks_) {
-      Status s = bm_->DeleteBlock(block);
-      // If we get NotFound, then the block was already deleted.
-      if (!s.ok() && !s.IsNotFound()) {
-        if (first_failure.ok()) first_failure = s;
-      } else {
-        deleted->emplace_back(block);
-      }
-    }
-
-    if (!first_failure.ok()) {
-      first_failure = first_failure.CloneAndPrepend(strings::Substitute("only deleted $0
blocks, "
-                                                                        "first failure",
-                                                                        deleted->size()));
-    }
-    deleted_blocks_.clear();
-    return first_failure;
-  }
-
- private:
-  // The owning BlockManager. Must outlive the BlockDeletionTransaction.
-  BlockManager* bm_;
-  std::vector<BlockId> deleted_blocks_;
+  virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) = 0;
 };
 
 // Compute an upper bound for a file cache embedded within a block manager

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index fdb9cb0..b3e6dbe 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -536,6 +536,102 @@ size_t FileReadableBlock::memory_footprint() const {
   return kudu_malloc_usable_size(this) + reader_->memory_footprint();
 }
 
+////////////////////////////////////////////////////////////
+// FileBlockCreationTransaction
+////////////////////////////////////////////////////////////
+
+class FileBlockCreationTransaction : public BlockCreationTransaction {
+ public:
+  FileBlockCreationTransaction() = default;
+
+  virtual ~FileBlockCreationTransaction() = default;
+
+  virtual void AddCreatedBlock(std::unique_ptr<WritableBlock> block) override;
+
+  virtual Status CommitCreatedBlocks() override;
+
+ private:
+  std::vector<std::unique_ptr<FileWritableBlock>> created_blocks_;
+};
+
+void FileBlockCreationTransaction::AddCreatedBlock(
+    std::unique_ptr<WritableBlock> block) {
+  FileWritableBlock* fwb =
+      down_cast<FileWritableBlock*>(block.release());
+  created_blocks_.emplace_back(unique_ptr<FileWritableBlock>(fwb));
+}
+
+Status FileBlockCreationTransaction::CommitCreatedBlocks() {
+  if (created_blocks_.empty()) {
+    return Status::OK();
+  }
+
+  VLOG(3) << "Closing " << created_blocks_.size() << " blocks";
+  if (FLAGS_block_manager_preflush_control == "close") {
+    // Ask the kernel to begin writing out each block's dirty data. This is
+    // done up-front to give the kernel opportunities to coalesce contiguous
+    // dirty pages.
+    for (const auto& block : created_blocks_) {
+      RETURN_NOT_OK(block->FlushDataAsync());
+    }
+  }
+
+  // Now close each block, waiting for each to become durable.
+  for (const auto& block : created_blocks_) {
+    RETURN_NOT_OK(block->Close());
+  }
+  created_blocks_.clear();
+  return Status::OK();
+}
+
+////////////////////////////////////////////////////////////
+// FileBlockDeletionTransaction
+////////////////////////////////////////////////////////////
+
+class FileBlockDeletionTransaction : public BlockDeletionTransaction {
+ public:
+  explicit FileBlockDeletionTransaction(FileBlockManager* fbm)
+      : fbm_(fbm) {
+  }
+
+  virtual ~FileBlockDeletionTransaction() = default;
+
+  virtual void AddDeletedBlock(BlockId block) override;
+
+  virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) override;
+
+ private:
+  // The owning FileBlockManager. Must outlive the FileBlockDeletionTransaction.
+  FileBlockManager* fbm_;
+  std::vector<BlockId> deleted_blocks_;
+  DISALLOW_COPY_AND_ASSIGN(FileBlockDeletionTransaction);
+};
+
+void FileBlockDeletionTransaction::AddDeletedBlock(BlockId block) {
+  deleted_blocks_.emplace_back(block);
+}
+
+Status FileBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* deleted)
{
+  Status first_failure;
+  for (BlockId block : deleted_blocks_) {
+    Status s = fbm_->DeleteBlock(block);
+    // If we get NotFound, then the block was already deleted.
+    if (!s.ok() && !s.IsNotFound()) {
+      if (first_failure.ok()) first_failure = s;
+    } else {
+      deleted->emplace_back(block);
+    }
+  }
+
+  if (!first_failure.ok()) {
+    first_failure = first_failure.CloneAndPrepend(strings::Substitute("only deleted $0 blocks,
"
+                                                                      "first failure",
+                                                                      deleted->size()));
+  }
+  deleted_blocks_.clear();
+  return first_failure;
+}
+
 } // namespace internal
 
 ////////////////////////////////////////////////////////////
@@ -751,24 +847,15 @@ Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
   return Status::OK();
 }
 
-Status FileBlockManager::CloseBlocks(const vector<unique_ptr<WritableBlock>>&
blocks) {
-  VLOG(3) << "Closing " << blocks.size() << " blocks";
-  if (FLAGS_block_manager_preflush_control == "close") {
-    // Ask the kernel to begin writing out each block's dirty data. This is
-    // done up-front to give the kernel opportunities to coalesce contiguous
-    // dirty pages.
-    for (const auto& block : blocks) {
-      internal::FileWritableBlock* fwb =
-          down_cast<internal::FileWritableBlock*>(block.get());
-      RETURN_NOT_OK(fwb->FlushDataAsync());
-    }
-  }
+unique_ptr<BlockCreationTransaction> FileBlockManager::NewCreationTransaction() {
+  CHECK(!read_only_);
+  return unique_ptr<internal::FileBlockCreationTransaction>(
+      new internal::FileBlockCreationTransaction());
+}
 
-  // Now close each block, waiting for each to become durable.
-  for (const auto& block : blocks) {
-    RETURN_NOT_OK(block->Close());
-  }
-  return Status::OK();
+shared_ptr<BlockDeletionTransaction> FileBlockManager::NewDeletionTransaction() {
+  CHECK(!read_only_);
+  return std::make_shared<internal::FileBlockDeletionTransaction>(this);
 }
 
 namespace {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index fb79475..fd3535b 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -88,7 +88,9 @@ class FileBlockManager : public BlockManager {
 
   Status DeleteBlock(const BlockId& block_id) override;
 
-  Status CloseBlocks(const std::vector<std::unique_ptr<WritableBlock>>& blocks)
override;
+  std::unique_ptr<BlockCreationTransaction> NewCreationTransaction() override;
+
+  std::shared_ptr<BlockDeletionTransaction> NewDeletionTransaction() override;
 
   Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 634f7db..b3f76f7 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -290,7 +290,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   BlockId saved_id;
   {
     Random rand(SeedRandom());
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < 10; i++) {
       unique_ptr<WritableBlock> b;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &b));
@@ -303,12 +303,12 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
       }
       b->Append(Slice(data, sizeof(data)));
       ASSERT_OK(b->Finalize());
-      transaction.AddCreatedBlock(std::move(b));
+      transaction->AddCreatedBlock(std::move(b));
     }
     // Metrics for full containers are updated after Finalize().
     ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 10, 10));
 
-    ASSERT_OK(transaction.CommitCreatedBlocks());
+    ASSERT_OK(transaction->CommitCreatedBlocks());
     ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 10 * 1024, 11, 10, 10));
   }
 
@@ -377,14 +377,14 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 
   // Create 4 containers, with the first four block IDs in the sequence.
   {
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < 4; i++) {
       unique_ptr<WritableBlock> writer;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
       block_ids.push_back(writer->id());
-      transaction.AddCreatedBlock(std::move(writer));
+      transaction->AddCreatedBlock(std::move(writer));
     }
-    ASSERT_OK(transaction.CommitCreatedBlocks());
+    ASSERT_OK(transaction->CommitCreatedBlocks());
   }
 
   // Create one more block, which should reuse the first container.
@@ -806,15 +806,15 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) {
   const int kNumBlocks = AllowSlowTests() ? 1000000 : 1000;
   // Creates 'kNumBlocks' blocks with minimal data.
   {
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < kNumBlocks; i++) {
       unique_ptr<WritableBlock> block;
       ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK_FAST(block->Append("x"));
       ASSERT_OK_FAST(block->Finalize());
-      transaction.AddCreatedBlock(std::move(block));
+      transaction->AddCreatedBlock(std::move(block));
     }
-    ASSERT_OK(transaction.CommitCreatedBlocks());
+    ASSERT_OK(transaction->CommitCreatedBlocks());
   }
   for (int i = 0; i < 10; i++) {
     LOG_TIMING(INFO, "reopening block manager") {
@@ -986,14 +986,14 @@ TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
 
   // Create several full containers.
   {
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < kNumContainers; i++) {
       unique_ptr<WritableBlock> block;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK(block->Append("a"));
-      transaction.AddCreatedBlock(std::move(block));
+      transaction->AddCreatedBlock(std::move(block));
     }
-    ASSERT_OK(transaction.CommitCreatedBlocks());
+    ASSERT_OK(transaction->CommitCreatedBlocks());
   }
   vector<string> container_names;
   NO_FATALS(GetContainerNames(&container_names));
@@ -1179,12 +1179,12 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
 
   // Create some containers.
   {
-    BlockCreationTransaction transaction(bm_.get());
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
     for (int i = 0; i < kNumContainers; i++) {
       unique_ptr<WritableBlock> block;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK(block->Append("a"));
-      transaction.AddCreatedBlock(std::move(block));
+      transaction->AddCreatedBlock(std::move(block));
     }
   }
   vector<string> container_names;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index c2094be..dcb0e32 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -1128,6 +1128,106 @@ void LogBlockContainer::SetReadOnly() {
   read_only_.Store(true);
 }
 
+///////////////////////////////////////////////////////////
+// LogBlockCreationTransaction
+////////////////////////////////////////////////////////////
+
+class LogBlockCreationTransaction : public BlockCreationTransaction {
+ public:
+  LogBlockCreationTransaction() = default;
+
+  virtual ~LogBlockCreationTransaction() = default;
+
+  virtual void AddCreatedBlock(std::unique_ptr<WritableBlock> block) override;
+
+  virtual Status CommitCreatedBlocks() override;
+
+ private:
+  std::vector<std::unique_ptr<LogWritableBlock>> created_blocks_;
+};
+
+void LogBlockCreationTransaction::AddCreatedBlock(
+    std::unique_ptr<WritableBlock> block) {
+  LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.release());
+  created_blocks_.emplace_back(unique_ptr<LogWritableBlock>(lwb));
+}
+
+Status LogBlockCreationTransaction::CommitCreatedBlocks() {
+  if (created_blocks_.empty()) {
+    return Status::OK();
+  }
+
+  VLOG(3) << "Closing " << created_blocks_.size() << " blocks";
+  unordered_map<LogBlockContainer*, vector<LogWritableBlock*>> created_block_map;
+  for (const auto& block : created_blocks_) {
+    if (FLAGS_block_manager_preflush_control == "close") {
+      // Ask the kernel to begin writing out each block's dirty data. This is
+      // done up-front to give the kernel opportunities to coalesce contiguous
+      // dirty pages.
+      RETURN_NOT_OK(block->FlushDataAsync());
+    }
+    created_block_map[block->container()].emplace_back(block.get());
+  }
+
+  // Close all blocks and sync the blocks belonging to the same
+  // container together to reduce fsync() usage, waiting for them
+  // to become durable.
+  for (const auto& entry : created_block_map) {
+    RETURN_NOT_OK(entry.first->DoCloseBlocks(entry.second,
+                                             LogBlockContainer::SyncMode::SYNC));
+  }
+  created_blocks_.clear();
+  return Status::OK();
+}
+
+///////////////////////////////////////////////////////////
+// LogBlockDeletionTransaction
+////////////////////////////////////////////////////////////
+
+class LogBlockDeletionTransaction : public BlockDeletionTransaction {
+ public:
+  explicit LogBlockDeletionTransaction(LogBlockManager* lbm)
+      : lbm_(lbm) {
+  }
+
+  virtual ~LogBlockDeletionTransaction() = default;
+
+  virtual void AddDeletedBlock(BlockId block) override;
+
+  virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) override;
+
+ private:
+  // The owning LogBlockManager. Must outlive the LogBlockDeletionTransaction.
+  LogBlockManager* lbm_;
+  std::vector<BlockId> deleted_blocks_;
+  DISALLOW_COPY_AND_ASSIGN(LogBlockDeletionTransaction);
+};
+
+void LogBlockDeletionTransaction::AddDeletedBlock(BlockId block) {
+  deleted_blocks_.emplace_back(block);
+}
+
+Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* deleted)
{
+  Status first_failure;
+  for (BlockId block : deleted_blocks_) {
+    Status s = lbm_->DeleteBlock(block);
+    // If we get NotFound, then the block was already deleted.
+    if (!s.ok() && !s.IsNotFound()) {
+      if (first_failure.ok()) first_failure = s;
+    } else {
+      deleted->emplace_back(block);
+    }
+  }
+
+  if (!first_failure.ok()) {
+    first_failure = first_failure.CloneAndPrepend(strings::Substitute("only deleted $0 blocks,
"
+                                                                      "first failure",
+                                                                      deleted->size()));
+  }
+  deleted_blocks_.clear();
+  return first_failure;
+}
+
 ////////////////////////////////////////////////////////////
 // LogBlock (definition)
 ////////////////////////////////////////////////////////////
@@ -1777,30 +1877,15 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
   return Status::OK();
 }
 
-Status LogBlockManager::CloseBlocks(const std::vector<unique_ptr<WritableBlock>>&
blocks) {
-  VLOG(3) << "Closing " << blocks.size() << " blocks";
-
-  unordered_map<LogBlockContainer*, vector<LogWritableBlock*>> created_block_map;
-  for (const auto& block : blocks) {
-    LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.get());
-
-    if (FLAGS_block_manager_preflush_control == "close") {
-      // Ask the kernel to begin writing out each block's dirty data. This is
-      // done up-front to give the kernel opportunities to coalesce contiguous
-      // dirty pages.
-      RETURN_NOT_OK(lwb->FlushDataAsync());
-    }
-    created_block_map[lwb->container()].emplace_back(lwb);
-  }
+unique_ptr<BlockCreationTransaction> LogBlockManager::NewCreationTransaction() {
+  CHECK(!read_only_);
+  return unique_ptr<internal::LogBlockCreationTransaction>(
+      new internal::LogBlockCreationTransaction());
+}
 
-  // Close all blocks and sync the blocks belonging to the same
-  // container together to reduce fsync() usage, waiting for them
-  // to become durable.
-  for (const auto& entry : created_block_map) {
-    RETURN_NOT_OK(entry.first->DoCloseBlocks(entry.second,
-                                             LogBlockContainer::SyncMode::SYNC));
-  }
-  return Status::OK();
+shared_ptr<BlockDeletionTransaction> LogBlockManager::NewDeletionTransaction() {
+  CHECK(!read_only_);
+  return std::make_shared<internal::LogBlockDeletionTransaction>(this);
 }
 
 Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 7b9b26a..ea4d4ae 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -180,7 +180,9 @@ class LogBlockManager : public BlockManager {
 
   Status DeleteBlock(const BlockId& block_id) override;
 
-  Status CloseBlocks(const std::vector<std::unique_ptr<WritableBlock>>& blocks)
override;
+  std::unique_ptr<BlockCreationTransaction> NewCreationTransaction() override;
+
+  std::shared_ptr<BlockDeletionTransaction> NewDeletionTransaction() override;
 
   Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 37f9f51..f4a391d 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -80,6 +80,7 @@ using cfile::CFileReader;
 using cfile::IndexTreeIterator;
 using cfile::ReaderOptions;
 using fs::BlockCreationTransaction;
+using fs::BlockManager;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 
@@ -113,9 +114,10 @@ Status DeltaFileWriter::Start() {
 }
 
 Status DeltaFileWriter::Finish() {
-  BlockCreationTransaction transaction(writer_->block()->block_manager());
-  RETURN_NOT_OK(FinishAndReleaseBlock(&transaction));
-  return transaction.CommitCreatedBlocks();
+  BlockManager* bm = writer_->block()->block_manager();
+  unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  RETURN_NOT_OK(FinishAndReleaseBlock(transaction.get()));
+  return transaction->CommitCreatedBlocks();
 }
 
 Status DeltaFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 60d808a..b05015f 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -36,6 +36,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/types.h"
+#include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/tablet/cfile_set.h"
@@ -81,6 +82,7 @@ class OpId;
 namespace tablet {
 
 using cfile::BloomFileWriter;
+using fs::BlockManager;
 using fs::BlockCreationTransaction;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
@@ -227,10 +229,10 @@ Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
 
 Status DiskRowSetWriter::Finish() {
   TRACE_EVENT0("tablet", "DiskRowSetWriter::Finish");
-  FsManager* fs = rowset_metadata_->fs_manager();
-  BlockCreationTransaction transaction(fs->block_manager());
-  RETURN_NOT_OK(FinishAndReleaseBlocks(&transaction));
-  return transaction.CommitCreatedBlocks();
+  BlockManager* bm = rowset_metadata_->fs_manager()->block_manager();
+  unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  RETURN_NOT_OK(FinishAndReleaseBlocks(transaction.get()));
+  return transaction->CommitCreatedBlocks();
 }
 
 Status DiskRowSetWriter::FinishAndReleaseBlocks(BlockCreationTransaction* transaction) {
@@ -316,8 +318,9 @@ RollingDiskRowSetWriter::RollingDiskRowSetWriter(
       row_idx_in_cur_drs_(0),
       can_roll_(false),
       written_count_(0),
-      written_size_(0),
-      block_transaction_(tablet_metadata->fs_manager()->block_manager()) {
+      written_size_(0) {
+  BlockManager* bm = tablet_metadata->fs_manager()->block_manager();
+  block_transaction_ = bm->NewCreationTransaction();
   CHECK(schema.has_column_ids());
 }
 
@@ -422,7 +425,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
   }
   CHECK_EQ(state_, kStarted);
 
-  Status writer_status = cur_writer_->FinishAndReleaseBlocks(&block_transaction_);
+  Status writer_status = cur_writer_->FinishAndReleaseBlocks(block_transaction_.get());
 
   // If no rows were written (e.g. due to an empty flush or a compaction with all rows
   // deleted), FinishAndReleaseBlocks(...) returns Aborted. In that case, we don't
@@ -438,7 +441,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
 
     // Commit the UNDO block. Status::Aborted() indicates that there
     // were no UNDOs written.
-    Status s = cur_undo_writer_->FinishAndReleaseBlock(&block_transaction_);
+    Status s = cur_undo_writer_->FinishAndReleaseBlock(block_transaction_.get());
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitUndoDeltaDataBlock(cur_undo_ds_block_id_);
@@ -447,7 +450,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     }
 
     // Same for the REDO block.
-    s = cur_redo_writer_->FinishAndReleaseBlock(&block_transaction_);
+    s = cur_redo_writer_->FinishAndReleaseBlock(block_transaction_.get());
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitRedoDeltaDataBlock(0, cur_redo_ds_block_id_);
@@ -474,7 +477,7 @@ Status RollingDiskRowSetWriter::Finish() {
   DCHECK_EQ(state_, kStarted);
 
   RETURN_NOT_OK(FinishCurrentWriter());
-  RETURN_NOT_OK(block_transaction_.CommitCreatedBlocks());
+  RETURN_NOT_OK(block_transaction_->CommitCreatedBlocks());
 
   state_ = kFinished;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 15b8b18..471d7e7 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -36,7 +36,6 @@
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/fs/block_id.h"
-#include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
@@ -70,6 +69,10 @@ namespace consensus {
 class OpId;
 }
 
+namespace fs {
+class BlockCreationTransaction;
+}
+
 namespace log {
 class LogAnchorRegistry;
 }
@@ -269,7 +272,7 @@ class RollingDiskRowSetWriter {
 
   // Syncs and commits all writes of outstanding blocks when the rolling
   // writer is destroyed.
-  fs::BlockCreationTransaction block_transaction_;
+  std::unique_ptr<fs::BlockCreationTransaction> block_transaction_;
 
   DISALLOW_COPY_AND_ASSIGN(RollingDiskRowSetWriter);
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tablet/multi_column_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/multi_column_writer.cc b/src/kudu/tablet/multi_column_writer.cc
index 9412bdb..1fc2671 100644
--- a/src/kudu/tablet/multi_column_writer.cc
+++ b/src/kudu/tablet/multi_column_writer.cc
@@ -38,6 +38,7 @@ namespace tablet {
 
 using cfile::CFileWriter;
 using fs::BlockCreationTransaction;
+using fs::BlockManager;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
 using std::unique_ptr;
@@ -119,9 +120,10 @@ Status MultiColumnWriter::AppendBlock(const RowBlock& block) {
 }
 
 Status MultiColumnWriter::Finish() {
-  BlockCreationTransaction transaction(fs_->block_manager());
-  RETURN_NOT_OK(FinishAndReleaseBlocks(&transaction));
-  return transaction.CommitCreatedBlocks();
+  BlockManager* bm = fs_->block_manager();
+  unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
+  RETURN_NOT_OK(FinishAndReleaseBlocks(transaction.get()));
+  return transaction->CommitCreatedBlocks();
 }
 
 Status MultiColumnWriter::FinishAndReleaseBlocks(

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 65f174d..f5be355 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -177,7 +177,7 @@ TEST_F(TabletCopyClientTest, TestDownloadBlock) {
   // Check that the client downloaded the block and verification passed.
   BlockId new_block_id;
   ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));
-  ASSERT_OK(client_->transaction_.CommitCreatedBlocks());
+  ASSERT_OK(client_->transaction_->CommitCreatedBlocks());
 
   // Ensure it placed the block where we expected it to.
   ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
@@ -242,7 +242,7 @@ TEST_F(TabletCopyClientTest, TestVerifyData) {
 TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
   // Download and commit all the blocks.
   ASSERT_OK(client_->DownloadBlocks());
-  ASSERT_OK(client_->transaction_.CommitCreatedBlocks());
+  ASSERT_OK(client_->transaction_->CommitCreatedBlocks());
 
   // Verify the disk synchronization count.
   if (FLAGS_block_manager == "log") {
@@ -335,7 +335,7 @@ TEST_P(TabletCopyClientAbortTest, TestAbort) {
   int num_blocks_downloaded = 0;
   if (download_blocks == kDownloadBlocks) {
     ASSERT_OK(client_->DownloadBlocks());
-    ASSERT_OK(client_->transaction_.CommitCreatedBlocks());
+    ASSERT_OK(client_->transaction_->CommitCreatedBlocks());
     num_blocks_downloaded = num_remote_blocks;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index fbf3244..13fe64e 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -119,6 +119,7 @@ using consensus::ConsensusMetadataManager;
 using consensus::MakeOpId;
 using consensus::OpId;
 using env_util::CopyFile;
+using fs::BlockManager;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
 using rpc::Messenger;
@@ -156,8 +157,9 @@ TabletCopyClient::TabletCopyClient(std::string tablet_id,
       session_idle_timeout_millis_(FLAGS_tablet_copy_begin_session_timeout_ms),
       start_time_micros_(0),
       rng_(GetRandomSeed32()),
-      tablet_copy_metrics_(tablet_copy_metrics),
-      transaction_(fs_manager->block_manager()) {
+      tablet_copy_metrics_(tablet_copy_metrics) {
+  BlockManager* bm = fs_manager->block_manager();
+  transaction_ = bm->NewCreationTransaction();
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->Increment();
   }
@@ -385,7 +387,7 @@ Status TabletCopyClient::Finish() {
   //  2) While DownloadWALs() is running the kernel has more time to eagerly flush the blocks,
   //     so the fsync() operations could be cheaper.
   //  3) Downloaded blocks should be made durable before replacing superblock.
-  RETURN_NOT_OK(transaction_.CommitCreatedBlocks());
+  RETURN_NOT_OK(transaction_->CommitCreatedBlocks());
   state_ = kFinished;
 
   // Replace tablet metadata superblock. This will set the tablet metadata state
@@ -660,7 +662,7 @@ Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
 
   *new_block_id = block->id();
   RETURN_NOT_OK_PREPEND(block->Finalize(), "Unable to finalize block");
-  transaction_.AddCreatedBlock(std::move(block));
+  transaction_->AddCreatedBlock(std::move(block));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0174c267/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 0516c9f..f44e5c2 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -24,7 +24,6 @@
 
 #include <gtest/gtest_prod.h>
 
-#include "kudu/fs/block_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/metrics.h"
@@ -44,6 +43,10 @@ class ConsensusMetadataManager;
 class ConsensusStatePB;
 } // namespace consensus
 
+namespace fs {
+class BlockCreationTransaction;
+} // namespace fs
+
 namespace rpc {
 class Messenger;
 class RpcController;
@@ -247,7 +250,7 @@ class TabletCopyClient {
   TabletCopyClientMetrics* tablet_copy_metrics_;
 
   // Block transaction for the tablet copy.
-  fs::BlockCreationTransaction transaction_;
+  std::unique_ptr<fs::BlockCreationTransaction> transaction_;
 
   DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
 };


Mime
View raw message