carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed dataloading less number of rows than actual rows when data size is multiples of page size.
Date Wed, 03 May 2017 13:32:40 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f2fdf2962 -> 8410081cd


Fixed dataloading less number of rows than actual rows when data size is multiples of page
size.

Fix testcase.

Closing resources properly


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1fa2df9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1fa2df9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1fa2df9d

Branch: refs/heads/master
Commit: 1fa2df9df1ffba858e5e198f8fec42575e92606c
Parents: f2fdf29
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon May 1 08:06:58 2017 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed May 3 21:31:19 2017 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala    | 16 ++++++++++++----
 .../newflow/converter/impl/RowConverterImpl.java  |  5 ++++-
 .../steps/DataConverterProcessorStepImpl.java     |  1 +
 .../store/CarbonFactDataHandlerColumnar.java      | 13 +++++++------
 .../writer/v1/CarbonFactDataWriterImplV1.java     |  3 +++
 .../writer/v2/CarbonFactDataWriterImplV2.java     |  3 +++
 .../writer/v3/CarbonFactDataWriterImplV3.java     | 18 ++++++++----------
 .../store/writer/v3/DataWriterHolder.java         |  8 ++++++++
 8 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 3b0fd4a..2d86497 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -31,7 +31,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
 
   def buildTestData() = {
     import sqlContext.implicits._
-    df = sqlContext.sparkContext.parallelize(1 to 1000)
+    df = sqlContext.sparkContext.parallelize(1 to 32000)
       .map(x => ("a", "b", x))
       .toDF("c1", "c2", "c3")
 
@@ -62,6 +62,8 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     buildTestData
   }
 
+
+
   test("test load dataframe with saving compressed csv files") {
     // save dataframe to carbon file
     df.write
@@ -72,7 +74,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
     checkAnswer(
-      sql("select count(*) from carbon1 where c3 > 500"), Row(500)
+      sql("select count(*) from carbon1 where c3 > 500"), Row(31500)
     )
   }
 
@@ -86,7 +88,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
     checkAnswer(
-      sql("select count(*) from carbon2 where c3 > 500"), Row(500)
+      sql("select count(*) from carbon2 where c3 > 500"), Row(31500)
     )
   }
 
@@ -99,7 +101,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
     checkAnswer(
-      sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+      sql("select count(*) from carbon3 where c3 > 500"), Row(31500)
     )
   }
 
@@ -114,6 +116,12 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       sql("SELECT decimal FROM carbon4"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
   }
 
+  test("test loading data if the data count is multiple of page size"){
+    checkAnswer(
+      sql("SELECT count(*) FROM carbon2"),Seq(Row(32000)))
+  }
+
+
   override def afterAll {
     dropTable
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 2471314..5a476da 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -178,7 +178,10 @@ public class RowConverterImpl implements RowConverter {
           client.shutDown();
         }
       }
-      executorService.shutdownNow();
+      if (executorService != null) {
+        executorService.shutdownNow();
+        executorService = null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index cc99469..5d065b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -85,6 +85,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
         if (first) {
           first = false;
           localConverter = converters.get(0).createCopyForNewThread();
+          converters.add(localConverter);
         }
         return childIter.hasNext();
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7a80f72..f6ceb84 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -589,6 +589,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler
{
   /** generate the NodeHolder from the input rows */
   private NodeHolder processDataRows(List<Object[]> dataRows)
       throws CarbonDataWriterException {
+    if (dataRows.size() == 0) {
+      return new NodeHolder();
+    }
     // to store index of the measure columns which are null
     BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
     // statistics for one blocklet/page
@@ -859,12 +862,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler
{
   public void finish() throws CarbonDataWriterException {
     // still some data is present in stores if entryCount is more
     // than 0
-    if (this.entryCount > 0) {
-      producerExecutorServiceTaskList.add(producerExecutorService
-          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter,
true)));
-      blockletProcessingCount.incrementAndGet();
-      processedDataCount += entryCount;
-    }
+    producerExecutorServiceTaskList.add(producerExecutorService
+        .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+    blockletProcessingCount.incrementAndGet();
+    processedDataCount += entryCount;
     closeWriterExecutionService(producerExecutorService);
     processWriteTaskSubmitList(producerExecutorServiceTaskList);
     processingComplete = true;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 64077e2..bc9b453 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -198,6 +198,9 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]>
{
   }
 
   @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException
{
+    if (holder.getEntryCount() == 0) {
+      return;
+    }
     int indexBlockSize = 0;
     for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
       indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index ec79186..e8b43a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -65,6 +65,9 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1
{
    * @throws CarbonDataWriterException any problem in writing operation
    */
   @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException
{
+    if (holder.getEntryCount() == 0) {
+      return;
+    }
     // size to calculate the size of the blocklet
     int size = 0;
     // get the blocklet info object

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 6f05b69..bb80d1e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -324,7 +324,9 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
           isAdded = true;
           dataWriterHolder.addNodeHolder(holder);
         }
-        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+
+        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + dataWriterHolder.getTotalRows());
         // write the data
         writeDataToFile(fileChannel);
       }
@@ -334,16 +336,12 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     } else {
       //for last blocklet check if the last page will exceed the blocklet size then write
       // existing pages and then last page
-      if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize
-          && dataWriterHolder.getNodeHolder().size() > 0) {
-        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
-        writeDataToFile(fileChannel);
-        dataWriterHolder.addNodeHolder(holder);
-        LOGGER.info("Number of Pages for blocklet is: " + "1");
-        writeDataToFile(fileChannel);
-      } else {
+      if (holder.getEntryCount() > 0) {
         dataWriterHolder.addNodeHolder(holder);
-        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getSize());
+      }
+      if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
+        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + dataWriterHolder.getTotalRows());
         writeDataToFile(fileChannel);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1fa2df9d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
index 4368b2b..a98f388 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -48,6 +48,14 @@ public class DataWriterHolder {
     return nodeHolder.size();
   }
 
+  public int getTotalRows() {
+    int rows = 0;
+    for (NodeHolder nh : nodeHolder) {
+      rows += nh.getEntryCount();
+    }
+    return rows;
+  }
+
   public List<NodeHolder> getNodeHolder() {
     return nodeHolder;
   }


Mime
View raw message