avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From th...@apache.org
Subject [avro] branch master updated: Fixed C++ data file reader to handle zero-object blocks (#414)
Date Tue, 25 Dec 2018 02:10:07 GMT
This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new d2f92b4  Fixed C++ data file reader to handle zero-object blocks (#414)
d2f92b4 is described below

commit d2f92b445cf07683b7aa9afd2f69e2b23a033e6b
Author: Thiruvalluvan M G <thiru@apache.org>
AuthorDate: Tue Dec 25 07:40:02 2018 +0530

    Fixed C++ data file reader to handle zero-object blocks (#414)
---
 lang/c++/api/DataFile.hh       |  2 +-
 lang/c++/impl/DataFile.cc      | 42 +++++++++++++++++++++++-------------------
 lang/c++/test/DataFileTests.cc | 25 ++++++++++++++++++++++---
 3 files changed, 46 insertions(+), 23 deletions(-)

diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index 1a95296..5e2919c 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -206,7 +206,7 @@ class AVRO_DECL DataFileReaderBase : boost::noncopyable {
     std::string uncompressed;
     void readHeader();
 
-    bool readDataBlock();
+    void readDataBlock();
     void doSeek(int64_t position);
 public:
     /**
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index ad645ec..5b8209b 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -317,21 +317,23 @@ std::ostream& operator << (std::ostream& os, const DataFileSync&
s)
 
 bool DataFileReaderBase::hasMore()
 {
-    if (eof_) {
-        return false;
-    } else if (objectCount_ != 0) {
-        return true;
-    }
+    for (; ;) {
+        if (eof_) {
+            return false;
+        } else if (objectCount_ != 0) {
+            return true;
+        }
 
-    dataDecoder_->init(*dataStream_);
-    drain(*dataStream_);
-    DataFileSync s;
-    decoder_->init(*stream_);
-    avro::decode(*decoder_, s);
-    if (s != sync_) {
-        throw Exception("Sync mismatch");
+        dataDecoder_->init(*dataStream_);
+        drain(*dataStream_);
+        DataFileSync s;
+        decoder_->init(*stream_);
+        avro::decode(*decoder_, s);
+        if (s != sync_) {
+            throw Exception("Sync mismatch");
+        }
+        readDataBlock();
     }
-    return readDataBlock();
 }
 
 class BoundedInputStream : public InputStream {
@@ -377,7 +379,7 @@ unique_ptr<InputStream> boundedInputStream(InputStream& in,
size_t limit)
     return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
 }
 
-bool DataFileReaderBase::readDataBlock()
+void DataFileReaderBase::readDataBlock()
 {
     decoder_->init(*stream_);
     blockStart_ = stream_->byteCount();
@@ -385,7 +387,7 @@ bool DataFileReaderBase::readDataBlock()
     size_t n = 0;
     if (! stream_->next(&p, &n)) {
         eof_ = true;
-        return false;
+        return;
     }
     stream_->backup(n);
     avro::decode(*decoder_, objectCount_);
@@ -452,7 +454,6 @@ bool DataFileReaderBase::readDataBlock()
         dataDecoder_->init(*in);
         dataStream_ = std::move(in);
     }
-    return true;
 }
 
 void DataFileReaderBase::close()
@@ -515,7 +516,8 @@ void DataFileReaderBase::readHeader()
     blockStart_ = stream_->byteCount();
 }
 
-void DataFileReaderBase::doSeek(int64_t position) {
+void DataFileReaderBase::doSeek(int64_t position)
+{
     if (SeekableInputStream *ss = dynamic_cast<SeekableInputStream *>(stream_.get()))
{
         if (!eof_) {
             dataDecoder_->init(*dataStream_);
@@ -529,12 +531,14 @@ void DataFileReaderBase::doSeek(int64_t position) {
     }
 }
 
-void DataFileReaderBase::seek(int64_t position) {
+void DataFileReaderBase::seek(int64_t position)
+{
     doSeek(position);
     readDataBlock();
 }
 
-void DataFileReaderBase::sync(int64_t position) {
+void DataFileReaderBase::sync(int64_t position)
+{
     doSeek(position);
     DataFileSync sync_buffer;
     const uint8_t *p = 0;
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 7a7f1d9..acdb16a 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -47,7 +47,7 @@ using avro::GenericDatum;
 using avro::GenericRecord;
 using avro::NodePtr;
 
-const int count = 1000;
+const int DEFAULT_COUNT = 1000;
 
 template <typename T>
 struct Complex {
@@ -169,11 +169,13 @@ class DataFileTest {
     const char* filename;
     const ValidSchema writerSchema;
     const ValidSchema readerSchema;
+    const int count;
 
 public:
-    DataFileTest(const char* f, const char* wsch, const char* rsch) :
+    DataFileTest(const char* f, const char* wsch, const char* rsch,
+            int count = DEFAULT_COUNT) :
         filename(f), writerSchema(makeValidSchema(wsch)),
-        readerSchema(makeValidSchema(rsch)) { }
+        readerSchema(makeValidSchema(rsch)), count(count) { }
 
     typedef pair<ValidSchema, GenericDatum> Pair;
 
@@ -189,6 +191,8 @@ public:
             ComplexInteger c(re, im);
             df.write(c);
         }
+        // Simulate writing an empty block.
+        df.flush();
         df.close();
     }
 
@@ -399,8 +403,16 @@ public:
         std::set<int64_t> sync_points_syncing;
         std::set<int64_t> sync_points_reading;
         {
+            /*
+             * sync() will stop at a block with 0 objects. But read()
+             * will transparently skip such blocks. So this test will
+             * fail if there are blocks with zero objects. In order to
+             * avoid such failures, we read one object after sync.
+             */
             avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+            ComplexInteger ci;
             for (int64_t prev = 0; prev != df.previousSync(); df.sync(prev)) {
+                df.read(ci);
                 prev = df.previousSync();
                 sync_points_syncing.insert(prev);
             }
@@ -632,6 +644,13 @@ test_suite*
 init_unit_test_suite(int argc, char *argv[])
 {
     {
+        test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test0.df");
+        shared_ptr<DataFileTest> t1(new DataFileTest("test1.d0", sch, isch, 0));
+        ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
+        addReaderTests(ts, t1);
+        boost::unit_test::framework::master_test_suite().add(ts);
+    }
+    {
         test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test1.df");
         shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
         ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));


Mime
View raw message