carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2423][SDK]SDK Reader support to read from Non Transactional Table
Date Wed, 02 May 2018 14:38:43 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master a03335759 -> 4b8dc0a58


[CARBONDATA-2423][SDK]SDK Reader support to read from Non Transactional Table

This closes #2257


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b8dc0a5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b8dc0a5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b8dc0a5

Branch: refs/heads/master
Commit: 4b8dc0a58aba542211b3908aa31456444e6bfe04
Parents: a033357
Author: sounakr <sounakr@gmail.com>
Authored: Wed May 2 08:56:09 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed May 2 20:08:27 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/SegmentTaskIndexStore.java   |  2 +-
 .../core/metadata/schema/table/CarbonTable.java | 15 ++++++--
 .../hadoop/api/CarbonFileInputFormat.java       | 34 ++++++++++++++---
 .../sdk/file/CarbonReaderBuilder.java           |  9 ++++-
 .../sdk/file/CSVCarbonWriterTest.java           |  7 +---
 .../carbondata/sdk/file/CarbonReaderTest.java   | 39 ++++++++++++++++++++
 .../apache/carbondata/sdk/file/TestUtil.java    | 24 +++++++++---
 7 files changed, 108 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index d9e544f..537c635 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -90,7 +90,7 @@ public class SegmentTaskIndexStore
       segmentTaskIndexWrapper =
           loadAndGetTaskIdToSegmentsMap(
               tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
-              CarbonTable.buildFromTablePath("name", "path"),
+              CarbonTable.buildFromTablePath("name", "path", false),
               tableSegmentUniqueIdentifier);
     } catch (IndexBuilderException e) {
       throw new IOException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9ae5ed4..1875237 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -244,10 +244,17 @@ public class CarbonTable implements Serializable {
     return buildFromTableInfo(tableInfo);
   }
 
-  public static CarbonTable buildFromTablePath(
-      String tableName, String tablePath) throws IOException {
-    return SchemaReader.readCarbonTableFromStore(
-        AbsoluteTableIdentifier.from(tablePath, tableName, "default"));
+  public static CarbonTable buildFromTablePath(String tableName, String tablePath,
+      boolean isTransactionalTable) throws IOException {
+    if (isTransactionalTable) {
+      return SchemaReader
+          .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, "default", tableName));
+    } else {
+      // Infer the schema from the Carbondata file.
+      TableInfo tableInfoInfer =
+          SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "null", "null"),
false);
+      return CarbonTable.buildFromTableInfo(tableInfoInfer);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 3dac3bb..2af147d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -116,8 +117,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T>
implements Se
       // get all valid segments and set them into the configuration
       // check for externalTable segment (Segment_null)
       // process and resolve the expression
-      ReadCommittedScope readCommittedScope = new LatestFilesReadCommittedScope(
-          identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+      ReadCommittedScope readCommittedScope = null;
+      if (carbonTable.isTransactionalTable()) {
+        readCommittedScope = new LatestFilesReadCommittedScope(
+            identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+      } else {
+        readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+      }
       Expression filter = getFilterPredicates(job.getConfiguration());
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // this will be null in case of corrupt schema file.
@@ -126,13 +132,31 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T>
implements Se
 
       FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider);
 
-      String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+      String segmentDir = null;
+      if (carbonTable.isTransactionalTable()) {
+        segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+      } else {
+        segmentDir = identifier.getTablePath();
+      }
       FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
       if (FileFactory.isFileExist(segmentDir, fileType)) {
         // if external table Segments are found, add it to the List
         List<Segment> externalTableSegments = new ArrayList<Segment>();
-        Segment seg = new Segment("null", null, readCommittedScope);
-        externalTableSegments.add(seg);
+        Segment seg;
+        if (carbonTable.isTransactionalTable()) {
+          // SDK some cases write into the Segment Path instead of Table Path i.e. inside
+          // the "Fact/Part0/Segment_null". The segment in this case is named as "null".
+          // The table is denoted by default as a transactional table and goes through
+          // the path of CarbonFileInputFormat. The above scenario is handled in the below
code.
+          seg = new Segment("null", null, readCommittedScope);
+          externalTableSegments.add(seg);
+        } else {
+          LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+          for (LoadMetadataDetails load : loadMetadataDetails) {
+            seg = new Segment(load.getLoadName(), null, readCommittedScope);
+            externalTableSegments.add(seg);
+          }
+        }
 
         Map<String, String> indexFiles =
             new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 9560ef7..d15e548 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -48,6 +48,7 @@ public class CarbonReaderBuilder {
   private String[] projectionColumns;
   private Expression filterExpression;
   private String tableName;
+  private boolean isTransactionalTable = true;
 
   CarbonReaderBuilder(String tablePath, String tableName) {
     this.tablePath = tablePath;
@@ -60,6 +61,12 @@ public class CarbonReaderBuilder {
     return this;
   }
 
+  public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable) {
+    Objects.requireNonNull(isTransactionalTable);
+    this.isTransactionalTable = isTransactionalTable;
+    return this;
+  }
+
   public CarbonReaderBuilder filter(Expression fileterExpression) {
     Objects.requireNonNull(fileterExpression);
     this.filterExpression = fileterExpression;
@@ -134,7 +141,7 @@ public class CarbonReaderBuilder {
   }
 
   public <T> CarbonReader<T> build() throws IOException, InterruptedException
{
-    CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath);
+    CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath, isTransactionalTable);
 
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
     final Job job = new Job(new Configuration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index c4dcee9..ba3d3ac 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -19,14 +19,11 @@ package org.apache.carbondata.sdk.file;
 
 import java.io.File;
 import java.io.FileFilter;
-import java.io.FilenameFilter;
 import java.io.IOException;
 
-import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.io.FileUtils;
@@ -137,7 +134,7 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100,
false);
 
     // TODO: implement reader to verify the number of blocklet in the file
 
@@ -153,7 +150,7 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2,
true);
 
     File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
     File[] dataFiles = segmentFolder.listFiles(new FileFilter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index bb1a7c6..f026499 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -125,4 +125,43 @@ public class CarbonReaderTest {
 
     FileUtils.deleteDirectory(new File(path));
   }
+
+
+  @Test
+  public void testWriteAndReadFilesNonTransactional() throws IOException, InterruptedException
{
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    // Write to a Non Transactional Table
+    TestUtil.writeFilesAndVerify(new Schema(fields), path, true, false);
+
+    CarbonReader reader = CarbonReader.builder(path, "_temp")
+        .projection(new String[]{"name", "age"})
+        .isTransactionalTable(false)
+        .build();
+
+    // expected output after sorting
+    String[] name = new String[100];
+    int[] age = new int[100];
+    for (int i = 0; i < 100; i++) {
+      name[i] = "robot" + (i / 10);
+      age[i] = (i % 10) * 10 + i / 10;
+    }
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      Assert.assertEquals(name[i], row[0]);
+      Assert.assertEquals(age[i], row[1]);
+      i++;
+    }
+    Assert.assertEquals(i, 100);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 03aecb8..6870f36 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -34,11 +34,16 @@ public class TestUtil {
   }
 
   static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true);
   }
 
   public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema)
{
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, true);
+  }
+
+  public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema,
+      boolean isTransactionalTable) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
   }
 
   /**
@@ -50,13 +55,14 @@ public class TestUtil {
    * @param persistSchema true if want to persist schema file
    * @param blockletSize blockletSize in the file, -1 for default size
    * @param blockSize blockSize in the file, -1 for default size
+   * @param isTransactionalTable set to true if this is written for Transactional Table.
    */
   static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize) {
+      boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable)
{
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(schema)
-          .isTransactionalTable(true)
+          .isTransactionalTable(isTransactionalTable)
           .outputPath(path);
       if (sortColumns != null) {
         builder = builder.sortBy(sortColumns);
@@ -85,8 +91,14 @@ public class TestUtil {
       Assert.fail(l.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-    Assert.assertTrue(segmentFolder.exists());
+    File segmentFolder = null;
+    if (isTransactionalTable) {
+      segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+      Assert.assertTrue(segmentFolder.exists());
+    } else {
+      segmentFolder = new File(path);
+      Assert.assertTrue(segmentFolder.exists());
+    }
 
     File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {


Mime
View raw message