kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [3/3] kudu git commit: env: add WriteV() API
Date Thu, 04 May 2017 20:12:21 GMT
env: add WriteV() API

Adds WriteV() methods to RWFile and WritableFile that allows
writing multiple data Slices in one call. The implementation
leverages the pwritev system call when possible and simulates it
with pwrite calls when unavailable.

Additionally adds WriteV()/AppendV() methods to the block manager abstraction.
These methods will be used in KUDU-463 to support writing
checksums and block data in a single call.

Change-Id: I30acfa2e4918ef945c55646647913b36a07daaa4
Reviewed-on: http://gerrit.cloudera.org:8080/6800
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/174a058e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/174a058e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/174a058e

Branch: refs/heads/master
Commit: 174a058e29750d27f4a873332d8cdde3b260292e
Parents: 6217879
Author: Grant Henke <granthenke@gmail.com>
Authored: Thu May 4 10:18:30 2017 -0500
Committer: Adar Dembo <adar@cloudera.com>
Committed: Thu May 4 20:11:53 2017 +0000

----------------------------------------------------------------------
 src/kudu/cfile/cfile_writer.cc    |  23 ++--
 src/kudu/cfile/cfile_writer.h     |   2 +-
 src/kudu/consensus/log_util.cc    |   7 +-
 src/kudu/fs/block_manager.h       |   6 +
 src/kudu/fs/file_block_manager.cc |  21 +++-
 src/kudu/fs/log_block_manager.cc  |  39 +++++--
 src/kudu/util/env-test.cc         |  25 +++--
 src/kudu/util/env.h               |   5 +-
 src/kudu/util/env_posix.cc        | 199 +++++++++++++++++----------------
 src/kudu/util/file_cache.cc       |   6 +
 10 files changed, 200 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/cfile/cfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 6724d45..5d5ad3c 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -18,6 +18,7 @@
 #include "kudu/cfile/cfile_writer.h"
 
 #include <glog/logging.h>
+#include <numeric>
 #include <string>
 #include <utility>
 
@@ -64,6 +65,7 @@ TAG_FLAG(cfile_do_on_finish, experimental);
 using google::protobuf::RepeatedPtrField;
 using kudu::fs::ScopedWritableBlockCloser;
 using kudu::fs::WritableBlock;
+using std::accumulate;
 using std::string;
 using std::unique_ptr;
 
@@ -461,9 +463,7 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
     out_slices = data_slices;
   }
 
-  for (const Slice &data : out_slices) {
-    RETURN_NOT_OK(WriteRawData(data));
-  }
+  RETURN_NOT_OK(WriteRawData(out_slices));
 
   uint64_t total_size = off_ - start_offset;
 
@@ -473,14 +473,19 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
   return Status::OK();
 }
 
-Status CFileWriter::WriteRawData(const Slice& data) {
-  Status s = block_->Append(data);
+
+Status CFileWriter::WriteRawData(const vector<Slice>& data) {
+  size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                [&](int sum, const Slice& curr) {
+                                  return sum + curr.size();
+                                });
+  Status s = block_->AppendV(data);
   if (!s.ok()) {
-    LOG(WARNING) << "Unable to append slice of size "
-                << data.size() << " at offset " << off_
-                << ": " << s.ToString();
+    LOG(WARNING) << "Unable to append data of size "
+                 << data_size << " at offset " << off_
+                 << ": " << s.ToString();
   }
-  off_ += data.size();
+  off_ += data_size;
   return s;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/cfile/cfile_writer.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index 22895c2..b2dfac9 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -184,7 +184,7 @@ class CFileWriter {
                   BlockPointer *block_ptr,
                   const char *name_for_log);
 
-  Status WriteRawData(const Slice& data);
+  Status WriteRawData(const vector<Slice>& data);
 
   Status FinishCurDataBlock();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index eda0c55..30d68b1 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -823,9 +823,10 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data,
   InlineEncodeFixed32(&header_buf[12], crc::Crc32c(&header_buf[0], kEntryHeaderSizeV2
- 4));
 
   // Write the header to the file, followed by the batch data itself.
-  RETURN_NOT_OK(writable_file_->AppendVector({
-        Slice(header_buf, arraysize(header_buf)),
-        data_to_write}));
+  RETURN_NOT_OK(writable_file_->AppendV({
+                                            Slice(header_buf,
+                                                  arraysize(header_buf)),
+                                            data_to_write}));
   written_offset_ += arraysize(header_buf) + data_to_write.size();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 8165275..f9a19a4 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -113,6 +113,12 @@ class WritableBlock : public Block {
   // outstanding data to reach the disk.
   virtual Status Append(const Slice& data) = 0;
 
+  // Appends multiple chunks of data referenced by 'data' to the block.
+  //
+  // Does not guarantee durability of 'data'; Close() must be called for all
+  // outstanding data to reach the disk.
+  virtual Status AppendV(const std::vector<Slice>& data) = 0;
+
   // Begins an asynchronous flush of dirty block data to disk.
   //
   // This is purely a performance optimization for Close(); if there is

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index b2419ed..45ced1e 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -18,6 +18,7 @@
 #include "kudu/fs/file_block_manager.h"
 
 #include <memory>
+#include <numeric>
 #include <string>
 #include <vector>
 
@@ -37,6 +38,7 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 
+using std::accumulate;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -224,6 +226,8 @@ class FileWritableBlock : public WritableBlock {
 
   virtual Status Append(const Slice& data) OVERRIDE;
 
+  virtual Status AppendV(const vector<Slice>& data) OVERRIDE;
+
   virtual Status FlushDataAsync() OVERRIDE;
 
   virtual size_t BytesAppended() const OVERRIDE;
@@ -297,14 +301,23 @@ const BlockId& FileWritableBlock::id() const {
 }
 
 Status FileWritableBlock::Append(const Slice& data) {
-  DCHECK(state_ == CLEAN || state_ == DIRTY)
-      << "Invalid state: " << state_;
+  return AppendV({ data });
+}
 
-  RETURN_NOT_OK(writer_->Append(data));
+Status FileWritableBlock::AppendV(const vector<Slice>& data) {
+  DCHECK(state_ == CLEAN || state_ == DIRTY)
+  << "Invalid state: " << state_;
+  RETURN_NOT_OK(writer_->AppendV(data));
   RETURN_NOT_OK(location_.data_dir()->RefreshIsFull(
       DataDir::RefreshMode::ALWAYS));
   state_ = DIRTY;
-  bytes_appended_ += data.size();
+
+  // Calculate the amount of data written
+  size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                    [&](int sum, const Slice& curr) {
+                                      return sum + curr.size();
+                                    });
+  bytes_appended_ += bytes_written;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 1a3490e..dd181c9 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -282,11 +282,12 @@ class LogBlockContainer {
   Status EnsurePreallocated(int64_t block_start_offset,
                             size_t next_append_length);
 
-  // Writes 'data' to this container's data file at offset 'offset'.
-  //
-  // The on-disk effects of this call are made durable only after SyncData().
+  // See RWFile::Write()
   Status WriteData(int64_t offset, const Slice& data);
 
+  // See RWFile::WriteV()
+  Status WriteVData(int64_t offset, const vector<Slice>& data);
+
   // See RWFile::Read().
   Status ReadData(int64_t offset, Slice* result) const;
 
@@ -821,14 +822,22 @@ Status LogBlockContainer::PunchHole(int64_t offset, int64_t length)
{
 }
 
 Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) {
+  return WriteVData(offset, { data });
+}
+
+Status LogBlockContainer::WriteVData(int64_t offset, const vector<Slice>& data)
{
   DCHECK_GE(offset, next_block_offset_);
 
-  RETURN_NOT_OK(data_file_->Write(offset, data));
+  RETURN_NOT_OK(data_file_->WriteV(offset, data));
 
   // This append may have changed the container size if:
   // 1. It was large enough that it blew out the preallocated space.
   // 2. Preallocation was disabled.
-  if (offset + data.size() > preallocated_offset_) {
+  size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                [&](int sum, const Slice& curr) {
+                                  return sum + curr.size();
+                                });
+  if (offset + data_size > preallocated_offset_) {
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
   }
   return Status::OK();
@@ -1051,6 +1060,8 @@ class LogWritableBlock : public WritableBlock {
 
   virtual Status Append(const Slice& data) OVERRIDE;
 
+  virtual Status AppendV(const vector<Slice>& data) OVERRIDE;
+
   virtual Status FlushDataAsync() OVERRIDE;
 
   virtual size_t BytesAppended() const OVERRIDE;
@@ -1130,18 +1141,28 @@ BlockManager* LogWritableBlock::block_manager() const {
 }
 
 Status LogWritableBlock::Append(const Slice& data) {
+  return AppendV({ data });
+}
+
+Status LogWritableBlock::AppendV(const vector<Slice>& data) {
   DCHECK(state_ == CLEAN || state_ == DIRTY)
-      << "Invalid state: " << state_;
+  << "Invalid state: " << state_;
+
+  // Calculate the amount of data to write
+  size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                [&](int sum, const Slice& curr) {
+                                  return sum + curr.size();
+                                });
 
   // The metadata change is deferred to Close() or FlushDataAsync(),
   // whichever comes first. We can't do it now because the block's
   // length is still in flux.
 
   int64_t cur_block_offset = block_offset_ + block_length_;
-  RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data.size()));
+  RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data_size));
 
   MicrosecondsInt64 start_time = GetMonoTimeMicros();
-  RETURN_NOT_OK(container_->WriteData(cur_block_offset, data));
+  RETURN_NOT_OK(container_->WriteVData(cur_block_offset, data));
   MicrosecondsInt64 end_time = GetMonoTimeMicros();
 
   int64_t dur = end_time - start_time;
@@ -1149,7 +1170,7 @@ Status LogWritableBlock::Append(const Slice& data) {
   const char* counter = BUCKETED_COUNTER_NAME("lbm_writes", dur);
   TRACE_COUNTER_INCREMENT(counter, 1);
 
-  block_length_ += data.size();
+  block_length_ += data_size;
   state_ = DIRTY;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/util/env-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc
index d8ee720..fb79a69 100644
--- a/src/kudu/util/env-test.cc
+++ b/src/kudu/util/env-test.cc
@@ -51,6 +51,7 @@
 
 DECLARE_bool(never_fsync);
 DECLARE_int32(env_inject_short_read_bytes);
+DECLARE_int32(env_inject_short_write_bytes);
 
 namespace kudu {
 
@@ -142,8 +143,9 @@ class TestEnv : public KuduTest {
     ASSERT_NO_FATAL_FAILURE(VerifyTestData(s, offset));
   }
 
-  void TestAppendVector(size_t num_slices, size_t slice_size, size_t iterations,
-                        bool fast, bool pre_allocate, const WritableFileOptions& opts)
{
+  void TestAppendV(size_t num_slices, size_t slice_size, size_t iterations,
+                   bool fast, bool pre_allocate,
+                   const WritableFileOptions &opts) {
     const string kTestPath = GetTestPath("test_env_appendvec_read_append");
     shared_ptr<WritableFile> file;
     ASSERT_OK(env_util::OpenFileForWrite(opts, env_, kTestPath, &file));
@@ -158,6 +160,9 @@ class TestEnv : public KuduTest {
 
     MakeVectors(num_slices, slice_size, iterations, &data, &input);
 
+    // Force short writes to half the slice length.
+    FLAGS_env_inject_short_write_bytes = slice_size / 2;
+
     shared_ptr<RandomAccessFile> raf;
 
     if (!fast) {
@@ -172,7 +177,7 @@ class TestEnv : public KuduTest {
     LOG_TIMING(INFO, test_descr)  {
       for (int i = 0; i < iterations; i++) {
         if (fast || random() % 2) {
-          ASSERT_OK(file->AppendVector(input[i]));
+          ASSERT_OK(file->AppendV(input[i]));
         } else {
           for (const Slice& slice : input[i]) {
             ASSERT_OK(file->Append(slice));
@@ -494,18 +499,18 @@ TEST_F(TestEnv, TestIOVMax) {
   VerifyTestData(Slice(scratch, data_size), 0);
 }
 
-TEST_F(TestEnv, TestAppendVector) {
+TEST_F(TestEnv, TestAppendV) {
   WritableFileOptions opts;
-  LOG(INFO) << "Testing AppendVector() only, NO pre-allocation";
-  ASSERT_NO_FATAL_FAILURE(TestAppendVector(2000, 1024, 5, true, false, opts));
+  LOG(INFO) << "Testing AppendV() only, NO pre-allocation";
+  ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, false, opts));
 
   if (!fallocate_supported_) {
     LOG(INFO) << "fallocate not supported, skipping preallocated runs";
   } else {
-    LOG(INFO) << "Testing AppendVector() only, WITH pre-allocation";
-    ASSERT_NO_FATAL_FAILURE(TestAppendVector(2000, 1024, 5, true, true, opts));
-    LOG(INFO) << "Testing AppendVector() together with Append() and Read(), WITH pre-allocation";
-    ASSERT_NO_FATAL_FAILURE(TestAppendVector(128, 4096, 5, false, true, opts));
+    LOG(INFO) << "Testing AppendV() only, WITH pre-allocation";
+    ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, true, opts));
+    LOG(INFO) << "Testing AppendV() together with Append() and Read(), WITH pre-allocation";
+    ASSERT_NO_FATAL_FAILURE(TestAppendV(128, 4096, 5, false, true, opts));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 6d1e1fd..7f06c4e 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -445,7 +445,7 @@ class WritableFile {
   //
   // For implementation specific quirks and details, see comments in
   // implementation source code (e.g., env_posix.cc)
-  virtual Status AppendVector(const std::vector<Slice>& data_vector) = 0;
+  virtual Status AppendV(const std::vector<Slice>& data) = 0;
 
   // Pre-allocates 'size' bytes for the file in the underlying filesystem.
   // size bytes are added to the current pre-allocated size or to the current
@@ -539,6 +539,9 @@ class RWFile {
   // Writes 'data' to the file position given by 'offset'.
   virtual Status Write(uint64_t offset, const Slice& data) = 0;
 
+  // Writes the 'data' vector to the file position given by 'offset'.
+  virtual Status WriteV(uint64_t offset, const std::vector<Slice>& data) = 0;
+
   // Preallocates 'length' bytes for the file in the underlying filesystem
   // beginning at 'offset'. It is safe to preallocate the same range
   // repeatedly; this is an idempotent operation.

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 0179a26..88a3a7a 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -24,6 +24,7 @@
 #include <cstring>
 #include <ctime>
 #include <memory>
+#include <numeric>
 #include <string>
 #include <type_traits>
 #include <vector>
@@ -122,9 +123,13 @@ TAG_FLAG(env_inject_io_error, hidden);
 DEFINE_int32(env_inject_short_read_bytes, 0,
              "The number of bytes less than the requested bytes to read");
 TAG_FLAG(env_inject_short_read_bytes, hidden);
+DEFINE_int32(env_inject_short_write_bytes, 0,
+             "The number of bytes less than the requested bytes to write");
+TAG_FLAG(env_inject_short_write_bytes, hidden);
 
 using base::subtle::Atomic64;
 using base::subtle::Barrier_AtomicIncrement;
+using std::accumulate;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -174,9 +179,7 @@ int fallocate(int fd, int mode, off_t offset, off_t len) {
   }
   return 0;
 }
-#endif
 
-#if defined(__APPLE__)
 // Simulates Linux's preadv API on OS X.
 ssize_t preadv(int fd, const struct iovec* iovec, int count, off_t offset) {
   ssize_t total_read_bytes = 0;
@@ -194,6 +197,24 @@ ssize_t preadv(int fd, const struct iovec* iovec, int count, off_t offset)
{
   }
   return total_read_bytes;
 }
+
+// Simulates Linux's pwritev API on OS X.
+ssize_t pwritev(int fd, const struct iovec* iovec, int count, off_t offset) {
+  ssize_t total_written_bytes = 0;
+  for (int i = 0; i < count; i++) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, pwrite(fd, iovec[i].iov_base, iovec[i].iov_len, offset));
+    if (r < 0) {
+      return r;
+    }
+    total_written_bytes += r;
+    if (static_cast<size_t>(r) < iovec[i].iov_len) {
+      break;
+    }
+    offset += iovec[i].iov_len;
+  }
+  return total_written_bytes;
+}
 #endif
 
 
@@ -341,6 +362,71 @@ Status DoReadV(int fd, const string& filename, uint64_t offset, vector<Slice>*
r
   return Status::OK();
 }
 
+Status DoWriteV(int fd, const string& filename, uint64_t offset,
+                const vector<Slice>& data) {
+  MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                       Status::IOError(Env::kInjectedFailureStatusMsg));
+  ThreadRestrictions::AssertIOAllowed();
+
+  // Convert the results into the iovec vector to request
+  // and calculate the total bytes requested.
+  size_t bytes_req = 0;
+  size_t iov_size = data.size();
+  struct iovec iov[iov_size];
+  for (size_t i = 0; i < iov_size; i++) {
+    const Slice& result = data[i];
+    bytes_req += result.size();
+    iov[i] = { const_cast<uint8_t*>(result.data()), result.size() };
+  }
+
+  uint64_t cur_offset = offset;
+  size_t completed_iov = 0;
+  size_t rem = bytes_req;
+  while (rem > 0) {
+    // Never request more than IOV_MAX in one request.
+    size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX));
+    ssize_t w;
+    RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset));
+
+    // Fake a short write for testing.
+    if (PREDICT_FALSE(FLAGS_env_inject_short_write_bytes > 0 && rem == bytes_req))
{
+      DCHECK_LT(FLAGS_env_inject_short_write_bytes, w);
+      w -= FLAGS_env_inject_short_read_bytes;
+    }
+
+    if (PREDICT_FALSE(w < 0)) {
+      // An error: return a non-ok status.
+      return IOError(filename, errno);
+    }
+
+    DCHECK_LE(w, rem);
+
+    if (PREDICT_TRUE(w == rem)) {
+      // All requested bytes were read. This is almost always the case.
+      return Status::OK();
+    }
+    // Adjust iovec vector based on bytes read for the next request.
+    ssize_t bytes_rem = w;
+    for (size_t i = completed_iov; i < iov_size; i++) {
+      if (bytes_rem >= iov[i].iov_len) {
+        // The full length of this iovec was written.
+        completed_iov++;
+        bytes_rem -= iov[i].iov_len;
+      } else {
+        // Partially wrote this result.
+        // Adjust the iov_len and iov_base to write only the missing data.
+        iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
+        iov[i].iov_len -= bytes_rem;
+        break; // Don't need to adjust remaining iovec's.
+      }
+    }
+    cur_offset += w;
+    rem -= w;
+  }
+  DCHECK_EQ(0, rem);
+  return Status::OK();
+}
+
 class PosixSequentialFile: public SequentialFile {
  private:
   std::string filename_;
@@ -441,23 +527,20 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status Append(const Slice& data) OVERRIDE {
-    vector<Slice> data_vector;
-    data_vector.push_back(data);
-    return AppendVector(data_vector);
+    return AppendV({ data });
   }
 
-  virtual Status AppendVector(const vector<Slice>& data_vector) OVERRIDE {
+  virtual Status AppendV(const vector<Slice> &data) OVERRIDE {
     ThreadRestrictions::AssertIOAllowed();
-    static const size_t kIovMaxElements = IOV_MAX;
-
-    Status s;
-    for (size_t i = 0; i < data_vector.size() && s.ok(); i += kIovMaxElements)
{
-      size_t n = std::min(data_vector.size() - i, kIovMaxElements);
-      s = DoWritev(data_vector, i, n);
-    }
-
+    RETURN_NOT_OK(DoWriteV(fd_, filename_, filesize_, data));
+    // Calculate the amount of data written
+    size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                      [&](int sum, const Slice& curr) {
+                                        return sum + curr.size();
+                                      });
+    filesize_ += bytes_written;
     pending_sync_ = true;
-    return s;
+    return Status::OK();
   }
 
   virtual Status PreAllocate(uint64_t size) OVERRIDE {
@@ -555,68 +638,6 @@ class PosixWritableFile : public WritableFile {
   virtual const string& filename() const OVERRIDE { return filename_; }
 
  private:
-
-  Status DoWritev(const vector<Slice>& data_vector,
-                  size_t offset, size_t n) {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
-                         Status::IOError(Env::kInjectedFailureStatusMsg));
-
-    ThreadRestrictions::AssertIOAllowed();
-#if defined(__linux__)
-    DCHECK_LE(n, IOV_MAX);
-
-    struct iovec iov[n];
-    size_t j = 0;
-    size_t nbytes = 0;
-
-    for (size_t i = offset; i < offset + n; i++) {
-      const Slice& data = data_vector[i];
-      iov[j].iov_base = const_cast<uint8_t*>(data.data());
-      iov[j].iov_len = data.size();
-      nbytes += data.size();
-      ++j;
-    }
-
-    ssize_t written;
-    RETRY_ON_EINTR(written, pwritev(fd_, iov, n, filesize_));
-
-    if (PREDICT_FALSE(written == -1)) {
-      int err = errno;
-      return IOError(filename_, err);
-    }
-
-    filesize_ += written;
-
-    if (PREDICT_FALSE(written != nbytes)) {
-      return Status::IOError(
-          Substitute("pwritev error: expected to write $0 bytes, wrote $1 bytes instead"
-                     " (perhaps the disk is out of space)",
-                     nbytes, written));
-    }
-#else
-    for (size_t i = offset; i < offset + n; i++) {
-      const Slice& data = data_vector[i];
-      ssize_t written;
-      RETRY_ON_EINTR(written, pwrite(fd_, data.data(), data.size(), filesize_));
-      if (PREDICT_FALSE(written == -1)) {
-        int err = errno;
-        return IOError("pwrite error", err);
-      }
-
-      filesize_ += written;
-
-      if (PREDICT_FALSE(written != data.size())) {
-        return Status::IOError(
-            Substitute("pwrite error: expected to write $0 bytes, wrote $1 bytes instead"
-                       " (perhaps the disk is out of space)",
-                       data.size(), written));
-      }
-    }
-#endif
-
-    return Status::OK();
-  }
-
   const std::string filename_;
   int fd_;
   bool sync_on_close_;
@@ -649,27 +670,13 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
-                         Status::IOError(Env::kInjectedFailureStatusMsg));
-
-    ThreadRestrictions::AssertIOAllowed();
-    ssize_t written;
-    RETRY_ON_EINTR(written, pwrite(fd_, data.data(), data.size(), offset));
-
-    if (PREDICT_FALSE(written == -1)) {
-      int err = errno;
-      return IOError(filename_, err);
-    }
-
-    if (PREDICT_FALSE(written != data.size())) {
-      return Status::IOError(
-          Substitute("pwrite error: expected to write $0 bytes, wrote $1 bytes instead"
-                     " (perhaps the disk is out of space)",
-                     data.size(), written));
-    }
+    return WriteV(offset, { data });
+  }
 
+  virtual Status WriteV(uint64_t offset, const vector<Slice> &data) OVERRIDE {
+    Status s = DoWriteV(fd_, filename_, offset, data);
     pending_sync_.Store(true);
-    return Status::OK();
+    return s;
   }
 
   virtual Status PreAllocate(uint64_t offset,

http://git-wip-us.apache.org/repos/asf/kudu/blob/174a058e/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 2aa2aa1..b8d17cf 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -226,6 +226,12 @@ class Descriptor<RWFile> : public RWFile {
     return opened.file()->Write(offset, data);
   }
 
+  Status WriteV(uint64_t offset, const vector<Slice> &data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->WriteV(offset, data);
+  }
+
   Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
     RETURN_NOT_OK(ReopenFileIfNecessary(&opened));


Mime
View raw message