carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-2247][SDK] Support write Index file in CarbonWriter
Date Wed, 14 Mar 2018 07:13:52 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 881ea1e12 -> 6cb6f8380


[CARBONDATA-2247][SDK] Support write Index file in CarbonWriter

This closes #2053


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6cb6f838
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6cb6f838
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6cb6f838

Branch: refs/heads/master
Commit: 6cb6f8380d8195bb8dd9848ea262ca66cd8cdf9f
Parents: 881ea1e
Author: Jacky Li <jacky.likun@qq.com>
Authored: Mon Mar 12 19:38:08 2018 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Wed Mar 14 15:12:09 2018 +0800

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  31 +++-
 .../loading/model/CarbonLoadModelBuilder.java   |   2 +-
 .../processing/loading/model/LoadOption.java    |   1 +
 store/sdk/pom.xml                               |   5 +-
 .../sdk/file/CarbonWriterBuilder.java           |  12 +-
 .../sdk/file/CSVCarbonWriterSuite.java          | 170 +++++++++++++++++--
 6 files changed, 197 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 88774ec..8fdcbb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -18,11 +18,14 @@
 package org.apache.carbondata.core.metadata.schema.table;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
@@ -42,6 +45,16 @@ public class TableSchemaBuilder {
 
   private List<ColumnSchema> otherColumns = new LinkedList<>();
 
+  private int blockSize;
+
+  public TableSchemaBuilder blockSize(int blockSize) {
+    if (blockSize <= 0) {
+      throw new IllegalArgumentException("blockSize should be greater than 0");
+    }
+    this.blockSize = blockSize;
+    return this;
+  }
+
   public TableSchema build() {
     TableSchema schema = new TableSchema();
     schema.setTableId(UUID.randomUUID().toString());
@@ -53,6 +66,12 @@ public class TableSchemaBuilder {
     List<ColumnSchema> allColumns = new LinkedList<>(sortColumns);
     allColumns.addAll(otherColumns);
     schema.setListOfColumns(allColumns);
+
+    if (blockSize > 0) {
+      Map<String, String> property = new HashMap<>();
+      property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize));
+      schema.setTableProperties(property);
+    }
     return schema;
   }
 
@@ -62,15 +81,22 @@ public class TableSchemaBuilder {
     ColumnSchema newColumn = new ColumnSchema();
     newColumn.setColumnName(field.getFieldName());
     newColumn.setDataType(field.getDataType());
-    newColumn.setDimensionColumn(isSortColumn || field.getDataType() == DataTypes.STRING);
+    if (isSortColumn ||
+        field.getDataType() == DataTypes.STRING ||
+        field.getDataType() == DataTypes.DATE ||
+        field.getDataType() == DataTypes.TIMESTAMP) {
+      newColumn.setDimensionColumn(true);
+    } else {
+      newColumn.setDimensionColumn(false);
+    }
     newColumn.setSchemaOrdinal(ordinal++);
     newColumn.setColumnar(true);
     newColumn.setColumnUniqueId(UUID.randomUUID().toString());
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
     newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
-
     if (isSortColumn) {
       sortColumns.add(newColumn);
+      newColumn.setSortColumn(true);
     } else {
       otherColumns.add(newColumn);
     }
@@ -97,6 +123,7 @@ public class TableSchemaBuilder {
     List<Encoding> encodings = new LinkedList<>();
     if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) {
       encodings.add(Encoding.DIRECT_DICTIONARY);
+      encodings.add(Encoding.DICTIONARY);
     }
     if (isSortColumn) {
       encodings.add(Encoding.INVERTED_INDEX);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 7871643..8c005c3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -62,7 +62,7 @@ public class CarbonLoadModelBuilder {
   public CarbonLoadModel build(
       Map<String, String> options) throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
-    optionsFinal.put("sort_scope", "no_sort");
+
     if (!options.containsKey("fileheader")) {
       List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
       String[] columns = new String[csvHeader.size()];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 608d147..e605b9e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -194,6 +194,7 @@ public class LoadOption {
     }
 
     optionsFinal.put("single_pass", String.valueOf(singlePass));
+    optionsFinal.put("sort_scope", "local_sort");
     optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds",
""));
     return optionsFinal;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index b3dd464..9f7038a 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -43,9 +43,6 @@
       </resource>
       <resource>
         <directory>.</directory>
-        <includes>
-          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
-        </includes>
       </resource>
     </resources>
     <plugins>
@@ -144,7 +141,7 @@
         <configuration>
           <shadedArtifactAttached>false</shadedArtifactAttached>
           <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
-          <outputFile>carbondata-sdk.jar</outputFile>
+          <outputFile>target/carbondata-sdk.jar</outputFile>
           <artifactSet>
             <includes>
               <include>*:*</include>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index e06200a..8734341 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -53,6 +53,8 @@ public class CarbonWriterBuilder {
   private String path;
   private String[] sortColumns;
   private boolean persistSchemaFile;
+  private int blockletSize;
+  private int blockSize;
 
   public CarbonWriterBuilder withSchema(Schema schema) {
     Objects.requireNonNull(schema, "schema should not be null");
@@ -84,14 +86,16 @@ public class CarbonWriterBuilder {
     if (blockSize <= 0) {
       throw new IllegalArgumentException("blockSize should be greater than zero");
     }
-    throw new UnsupportedOperationException();
+    this.blockSize = blockSize;
+    return this;
   }
 
   public CarbonWriterBuilder withBlockletSize(int blockletSize) {
     if (blockletSize <= 0) {
       throw new IllegalArgumentException("blockletSize should be greater than zero");
     }
-    throw new UnsupportedOperationException();
+    this.blockletSize = blockletSize;
+    return this;
   }
 
   /**
@@ -128,6 +132,10 @@ public class CarbonWriterBuilder {
    */
   private CarbonTable buildCarbonTable() {
     TableSchemaBuilder tableSchemaBuilder = TableSchema.builder();
+    if (blockletSize > 0) {
+      tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize);
+    }
+
     List<String> sortColumnsList;
     if (sortColumns != null) {
       sortColumnsList = Arrays.asList(sortColumns);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cb6f838/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index aca2b2d..0ac6f38 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -67,14 +67,50 @@ public class CSVCarbonWriterSuite {
   }
 
   private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
     try {
-      CarbonWriter writer = CarbonWriter.builder()
+      CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(schema)
-          .outputPath(path)
-          .buildWriterForCSVInput();
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
 
-      for (int i = 0; i < 100; i++) {
-        writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double)
i / 2)});
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double)
i / 2)});
       }
       writer.close();
     } catch (Exception e) {
@@ -91,27 +127,119 @@ public class CSVCarbonWriterSuite {
       }
     });
     Assert.assertNotNull(dataFiles);
-    Assert.assertEquals(1, dataFiles.length);
+    Assert.assertTrue(dataFiles.length > 0);
   }
 
   @Test
-  public void testAllPrimitiveDataType() {
+  public void testAllPrimitiveDataType() throws IOException {
     // TODO: write all data type and read by CarbonRecordReader to verify the content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
   @Test
-  public void test2Blocklet() {
-    // TODO: write data with more than one blocklet
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
   @Test
-  public void test2Block() {
-    // TODO: write data with more than one block
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
   @Test
-  public void testSortColumns() {
-    // TODO: test sort column
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
   @Test
@@ -120,8 +248,20 @@ public class CSVCarbonWriterSuite {
   }
 
   @Test
-  public void testSchemaPersistence() {
-    // TODO: verify schema file is persisted in specified location
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
 }


Mime
View raw message