carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject carbondata git commit: [HOTFIX] improve sdk multi-thread performance
Date Wed, 05 Sep 2018 14:56:04 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master af2c469bb -> 94d2089b2


[HOTFIX] improve sdk multi-thread performance

problem: currently SDK writer will create multiple iterators in multi-thread scenario.
 But filling each iterator is not happening concurrently as it is synchronized at method level.

solution: In SDK multi-thread write scenario, don't synchronize method level. Synchronize
at iterator level.
As each iterator has its own queue, it can be done concurrently.
Also for Avro can use sdkUserCore in input processor step.

This closes #2672


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94d2089b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94d2089b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94d2089b

Branch: refs/heads/master
Commit: 94d2089b246d2e4dc0ea2a673a89553e5eff1e35
Parents: af2c469
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Wed Aug 29 23:11:09 2018 +0530
Committer: kumarvishal09 <kumarvishal1802@gmail.com>
Committed: Wed Sep 5 20:25:39 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableOutputFormat.java     |  27 +++--
 .../loading/DataLoadProcessBuilder.java         |   4 +-
 .../loading/model/CarbonLoadModel.java          |  14 +--
 .../loading/steps/InputProcessorStepImpl.java   |   7 +-
 .../InputProcessorStepWithNoConverterImpl.java  |  31 +----
 .../steps/JsonInputProcessorStepImpl.java       |   9 +-
 .../util/CarbonDataProcessorUtil.java           |   6 +-
 .../sdk/file/CarbonWriterBuilder.java           |   6 +-
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   | 116 +++++++++++++++++++
 9 files changed, 162 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 5cc275b..99d8532 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
@@ -235,8 +236,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
       final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
-    short sdkUserCore = loadModel.getSdkUserCores();
-    int itrSize = (sdkUserCore > 0) ? sdkUserCore : 1;
+    short sdkWriterCores = loadModel.getSdkWriterCores();
+    int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;
     final CarbonOutputIteratorWrapper[] iterators = new CarbonOutputIteratorWrapper[itrSize];
     for (int i = 0; i < itrSize; i++) {
       iterators[i] = new CarbonOutputIteratorWrapper();
@@ -273,7 +274,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
       }
     });
 
-    if (sdkUserCore > 0) {
+    if (sdkWriterCores > 0) {
       // CarbonMultiRecordWriter handles the load balancing of the write rows in round robin.
       return new CarbonMultiRecordWriter(iterators, dataLoadExecutor, loadModel, future,
           executorService);
@@ -460,27 +461,31 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
 
     private CarbonOutputIteratorWrapper[] iterators;
 
-    private int counter;
+    // keep counts of number of writes called
+    // and it is used to load balance each write call to one iterator.
+    private AtomicLong counter;
 
     CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators,
         DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
         ExecutorService executorService) {
       super(null, dataLoadExecutor, loadModel, future, executorService);
       this.iterators = iterators;
+      counter = new AtomicLong(0);
     }
 
-    @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects)
+    @Override public void write(NullWritable aVoid, ObjectArrayWritable objects)
         throws InterruptedException {
-      iterators[counter].write(objects.get());
-      if (++counter == iterators.length) {
-        //round robin reset
-        counter = 0;
+      int iteratorNum = (int) (counter.incrementAndGet() % iterators.length);
+      synchronized (iterators[iteratorNum]) {
+        iterators[iteratorNum].write(objects.get());
       }
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException
{
-      for (CarbonOutputIteratorWrapper itr : iterators) {
-        itr.closeWriter(false);
+      for (int i = 0; i < iterators.length; i++) {
+        synchronized (iterators[i]) {
+          iterators[i].closeWriter(false);
+        }
       }
       super.close(taskAttemptContext);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 666c598..a628d41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -313,8 +313,8 @@ public final class DataLoadProcessBuilder {
     }
     TableSpec tableSpec = new TableSpec(carbonTable);
     configuration.setTableSpec(tableSpec);
-    if (loadModel.getSdkUserCores() > 0) {
-      configuration.setWritingCoresCount(loadModel.getSdkUserCores());
+    if (loadModel.getSdkWriterCores() > 0) {
+      configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
     }
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index c985952..97e329d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -60,7 +60,7 @@ public class CarbonLoadModel implements Serializable {
   private boolean carbonTransactionalTable = true;
 
   /* Number of thread in which sdk writer is used */
-  private short sdkUserCores;
+  private short sdkWriterCores;
 
   private String csvHeader;
   private String[] csvHeaderColumns;
@@ -472,7 +472,7 @@ public class CarbonLoadModel implements Serializable {
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copy.loadMinSize = loadMinSize;
     copy.parentTablePath = parentTablePath;
-    copy.sdkUserCores = sdkUserCores;
+    copy.sdkWriterCores = sdkWriterCores;
     return copy;
   }
 
@@ -528,7 +528,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copyObj.loadMinSize = loadMinSize;
     copyObj.parentTablePath = parentTablePath;
-    copyObj.sdkUserCores = sdkUserCores;
+    copyObj.sdkWriterCores = sdkWriterCores;
     return copyObj;
   }
 
@@ -914,11 +914,11 @@ public class CarbonLoadModel implements Serializable {
     return mergedSegmentIds;
   }
 
-  public short getSdkUserCores() {
-    return sdkUserCores;
+  public short getSdkWriterCores() {
+    return sdkWriterCores;
   }
 
-  public void setSdkUserCores(short sdkUserCores) {
-    this.sdkUserCores = sdkUserCores;
+  public void setSdkWriterCores(short sdkWriterCores) {
+    this.sdkWriterCores = sdkWriterCores;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 5cccd4f..f540b3e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -47,7 +47,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
   private CarbonIterator<Object[]>[] inputIterators;
 
-  private short sdkUserCore;
+  // cores used in SDK writer, set by the user
+  private short sdkWriterCores;
 
   /**
    * executor service to execute the query
@@ -58,7 +59,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
-    this.sdkUserCore = configuration.getWritingCoresCount();
+    this.sdkWriterCores = configuration.getWritingCoresCount();
   }
 
   @Override public DataField[] getOutput() {
@@ -78,7 +79,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
     List<CarbonIterator<Object[]>>[] readerIterators =
-        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkUserCore);
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkWriterCores);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 15d0994..ce8b62f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.loading.steps;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -64,10 +63,14 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
   private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
 
+  // cores used in SDK writer, set by the user
+  private short sdkWriterCores;
+
   public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration,
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
+    sdkWriterCores = configuration.getWritingCoresCount();
   }
 
   @Override public DataField[] getOutput() {
@@ -133,7 +136,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    List<CarbonIterator<Object[]>>[] readerIterators =
+        CarbonDataProcessorUtil.partitionInputReaderIterators(this.inputIterators, sdkWriterCores);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
@@ -144,29 +148,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
     return outIterators;
   }
 
-  /**
-   * Partition input iterators equally as per the number of threads.
-   *
-   * @return
-   */
-  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number of parallel
threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
-    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.length; i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators[i]);
-    }
-    return iterators;
-  }
-
   @Override public void close() {
     if (!closed) {
       super.close();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
index f78c224..02c455c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
@@ -39,15 +39,16 @@ public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
   private CarbonIterator<Object[]>[] inputIterators;
 
-  boolean isRawDataRequired = false;
+  private boolean isRawDataRequired = false;
 
-  short sdkUserCore;
+  // cores used in SDK writer, set by the user
+  private short sdkWriterCores;
 
   public JsonInputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
-    sdkUserCore = configuration.getWritingCoresCount();
+    sdkWriterCores = configuration.getWritingCoresCount();
   }
 
   @Override public DataField[] getOutput() {
@@ -64,7 +65,7 @@ public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
     List<CarbonIterator<Object[]>>[] readerIterators =
-        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkUserCore);
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkWriterCores);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 5810217..218bac0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -663,11 +663,11 @@ public final class CarbonDataProcessorUtil {
    * @return
    */
   public static List<CarbonIterator<Object[]>>[] partitionInputReaderIterators(
-      CarbonIterator<Object[]>[] inputIterators, short sdkUserCores) {
+      CarbonIterator<Object[]>[] inputIterators, short sdkWriterCores) {
     // Get the number of cores configured in property.
     int numberOfCores;
-    if (sdkUserCores > 0) {
-      numberOfCores = sdkUserCores;
+    if (sdkWriterCores > 0) {
+      numberOfCores = sdkWriterCores;
     } else {
       numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index faac3dd..b38b491 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -416,7 +416,7 @@ public class CarbonWriterBuilder {
       throw new IllegalArgumentException(" numOfThreads must be greater than 0");
     }
     CarbonLoadModel loadModel = buildLoadModel(schema);
-    loadModel.setSdkUserCores(numOfThreads);
+    loadModel.setSdkWriterCores(numOfThreads);
     return new CSVCarbonWriter(loadModel);
   }
 
@@ -467,7 +467,7 @@ public class CarbonWriterBuilder {
     // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
     // which will skip Conversion Step.
     loadModel.setLoadWithoutConverterStep(true);
-    loadModel.setSdkUserCores(numOfThreads);
+    loadModel.setSdkWriterCores(numOfThreads);
     return new AvroCarbonWriter(loadModel);
   }
 
@@ -511,7 +511,7 @@ public class CarbonWriterBuilder {
     this.schema = carbonSchema;
     CarbonLoadModel loadModel = buildLoadModel(schema);
     loadModel.setJsonFileLoad(true);
-    loadModel.setSdkUserCores(numOfThreads);
+    loadModel.setSdkWriterCores(numOfThreads);
     return new JsonCarbonWriter(loadModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d2089b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
new file mode 100644
index 0000000..9a6e8e0
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * multi-thread Test suite for {@link CSVCarbonWriter}
+ */
+public class ConcurrentAvroSdkWriterTest {
+
+  private static final int recordsPerItr = 10;
+  private static final short numOfThreads = 4;
+
+  @Test public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    String mySchema =
+        "{" + "  \"name\": \"address\", " + "   \"type\": \"record\", " + "    \"fields\":
[  "
+            + "  { \"name\": \"name\", \"type\": \"string\"}, "
+            + "  { \"name\": \"age\", \"type\": \"int\"}, " + "  { " + "    \"name\": \"address\",
"
+            + "      \"type\": { " + "    \"type\" : \"record\", "
+            + "        \"name\" : \"my_address\", " + "        \"fields\" : [ "
+            + "    {\"name\": \"street\", \"type\": \"string\"}, "
+            + "    {\"name\": \"city\", \"type\": \"string\"} " + "  ]} " + "  } " + "] "
+ "}";
+
+    String json =
+        "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}";
+
+    // conversion to GenericData.Record
+    org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(mySchema);
+    GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
+      CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads);
+      // write in multi-thread
+      for (int i = 0; i < numOfThreads; i++) {
+        executorService.submit(new WriteLogic(writer, record));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.HOURS);
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    // read the files and verify the count
+    CarbonReader reader;
+    try {
+      reader =
+          CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build();
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        i++;
+      }
+      Assert.assertEquals(i, numOfThreads * recordsPerItr);
+      reader.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  class WriteLogic implements Runnable {
+    CarbonWriter writer;
+    GenericData.Record record;
+
+    WriteLogic(CarbonWriter writer, GenericData.Record record) {
+      this.writer = writer;
+      this.record = record;
+    }
+
+    @Override public void run() {
+      try {
+        for (int i = 0; i < recordsPerItr; i++) {
+          writer.write(record);
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+}


Mime
View raw message