carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2388][SDK]Avro Record Complex Type Implementation
Date Tue, 01 May 2018 17:59:06 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 7edef8f4a -> 3202cf517


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 72a3ce4..677047b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -17,10 +17,13 @@
 
 package org.apache.carbondata.sdk.file;
 
+import java.util.List;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 
 /**
  * A field represent one column
@@ -31,6 +34,14 @@ public class Field {
 
   private String name;
   private DataType type;
+  private List<StructField> children;
+  private String parent;
+  private String storeType = "columnnar";
+  private int schemaOrdinal = -1;
+  private int precision = 0;
+  private int scale = 0;
+  private String rawSchema = "";
+  private String columnComment = "";
 
   /**
    * Field Constructor
@@ -59,16 +70,55 @@ public class Field {
       this.type = DataTypes.FLOAT;
     } else if (type.equalsIgnoreCase("double")) {
       this.type = DataTypes.DOUBLE;
-    } else {
+    } else if (type.equalsIgnoreCase("array")) {
+      this.type = DataTypes.createDefaultArrayType();
+    } else if (type.equalsIgnoreCase("struct")) {
+      this.type = DataTypes.createDefaultStructType();
+    }
+    else {
       throw new IllegalArgumentException("unsupported data type: " + type);
     }
   }
 
-  /**
-   * Field constructor
-   * @param name name of the field
-   * @param type datatype of the field of class DataType
-   */
+  public Field(String name, String type, List<StructField> fields) {
+    this.name = name;
+    this.children = fields;
+    if (type.equalsIgnoreCase("string")) {
+      this.type = DataTypes.STRING;
+    } else if (type.equalsIgnoreCase("date")) {
+      this.type = DataTypes.DATE;
+    } else if (type.equalsIgnoreCase("timestamp")) {
+      this.type = DataTypes.TIMESTAMP;
+    } else if (type.equalsIgnoreCase("boolean")) {
+      this.type = DataTypes.BOOLEAN;
+    } else if (type.equalsIgnoreCase("byte")) {
+      this.type = DataTypes.BYTE;
+    } else if (type.equalsIgnoreCase("short")) {
+      this.type = DataTypes.SHORT;
+    } else if (type.equalsIgnoreCase("int")) {
+      this.type = DataTypes.INT;
+    } else if (type.equalsIgnoreCase("long")) {
+      this.type = DataTypes.LONG;
+    } else if (type.equalsIgnoreCase("float")) {
+      this.type = DataTypes.FLOAT;
+    } else if (type.equalsIgnoreCase("double")) {
+      this.type = DataTypes.DOUBLE;
+    } else if (type.equalsIgnoreCase("array")) {
+      this.type = DataTypes.createStructType(fields);
+    } else if (type.equalsIgnoreCase("struct")) {
+      this.type = DataTypes.createStructType(fields);
+    }
+    else {
+      throw new IllegalArgumentException("unsupported data type: " + type);
+    }
+  }
+
+  public Field(String name, DataType type, List<StructField> fields) {
+    this.name = name;
+    this.type = type;
+    this.children = fields;
+  }
+
   public Field(String name, DataType type) {
     this.name = name;
     this.type = type;
@@ -81,4 +131,64 @@ public class Field {
   public DataType getDataType() {
     return type;
   }
+
+  public List<StructField> getChildren() {
+    return children;
+  }
+
+  public void setChildren(List<StructField> children) {
+    this.children = children;
+  }
+
+  public String getParent() {
+    return parent;
+  }
+
+  public void setParent(String parent) {
+    this.parent = parent;
+  }
+
+  public String getStoreType() {
+    return storeType;
+  }
+
+  public int getSchemaOrdinal() {
+    return schemaOrdinal;
+  }
+
+  public void setSchemaOrdinal(int schemaOrdinal) {
+    this.schemaOrdinal = schemaOrdinal;
+  }
+
+  public int getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(int precision) {
+    this.precision = precision;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  public void setScale(int scale) {
+    this.scale = scale;
+  }
+
+  public String getRawSchema() {
+    return rawSchema;
+  }
+
+  public void setRawSchema(String rawSchema) {
+    this.rawSchema = rawSchema;
+  }
+
+  public String getColumnComment() {
+    return columnComment;
+  }
+
+  public void setColumnComment(String columnComment) {
+    this.columnComment = columnComment;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index f85f7d5..ed3f2f1 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -20,20 +20,28 @@ package org.apache.carbondata.sdk.file;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.CharEncoding;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
 import org.apache.avro.Schema;
 
+import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType.file;
+
 public class AvroCarbonWriterTest {
   private String path = "./AvroCarbonWriterSuiteWriteFiles";
 
@@ -177,7 +185,104 @@ public class AvroCarbonWriterTest {
 
   @Test
   public void testWriteNestedRecord() throws IOException {
-    // TODO
+    FileUtils.deleteDirectory(new File(path));
+
+    String newAvroSchema =
+        "{" +
+          " \"type\" : \"record\", " +
+          "  \"name\" : \"userInfo\", "  +
+          "  \"namespace\" : \"my.example\", " +
+          "  \"fields\" : [{\"name\" : \"username\", " +
+          "  \"type\" : \"string\", " +
+          "  \"default\" : \"NONE\"}, " +
+
+       " {\"name\" : \"age\", " +
+       " \"type\" : \"int\", " +
+       " \"default\" : -1}, " +
+
+    "{\"name\" : \"address\", " +
+     "   \"type\" : { " +
+      "  \"type\" : \"record\", " +
+       "   \"name\" : \"mailing_address\", " +
+        "  \"fields\" : [ {" +
+      "        \"name\" : \"street\", " +
+       "       \"type\" : \"string\", " +
+       "       \"default\" : \"NONE\"}, { " +
+
+      " \"name\" : \"city\", " +
+        "  \"type\" : \"string\", " +
+        "  \"default\" : \"NONE\"}, " +
+         "                 ]}, " +
+     " \"default\" : {} " +
+   " } " +
+"}";
+
+    String mySchema =
+    "{" +
+    "  \"name\": \"address\", " +
+    "   \"type\": \"record\", " +
+    "    \"fields\": [  " +
+    "  { \"name\": \"name\", \"type\": \"string\"}, " +
+    "  { \"name\": \"age\", \"type\": \"int\"}, " +
+    "  { " +
+    "    \"name\": \"address\", " +
+    "      \"type\": { " +
+    "    \"type\" : \"record\", " +
+    "        \"name\" : \"my_address\", " +
+    "        \"fields\" : [ " +
+    "    {\"name\": \"street\", \"type\": \"string\"}, " +
+    "    {\"name\": \"city\", \"type\": \"string\"} " +
+    "  ]} " +
+    "  } " +
+    "] " +
+    "}";
+
+   String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}";
+
+
+    // conversion to GenericData.Record
+    Schema nn = new Schema.Parser().parse(mySchema);
+    JsonAvroConverter converter = new JsonAvroConverter();
+    GenericData.Record record = converter.convertToGenericDataRecord(
+        json.getBytes(CharEncoding.UTF_8), nn);
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("name1", DataTypes.STRING);
+    // fields[1] = new Field("age", DataTypes.INT);
+    List fld = new ArrayList<StructField>();
+    fld.add(new StructField("street", DataTypes.STRING));
+    fld.add(new StructField("city", DataTypes.STRING));
+    fields[2] = new Field("address", "struct", fld);
+
+    try {
+      CarbonWriter writer = CarbonWriter.builder()
+          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .outputPath(path)
+          .isTransactionalTable(true)
+          .buildWriterForAvroInput();
+
+      for (int i = 0; i < 100; i++) {
+        writer.write(record);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(1, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index 66d89c8..266fabd 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -95,6 +95,9 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object>
{
       } else {
         boolean isDirectDictionary =
             CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
+        boolean isDictionary =
+            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
+
         String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
             .get(CarbonCommonConstants.DICTIONARY_PATH);
         DictionaryColumnUniqueIdentifier dictionarIdentifier =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
index d5f77f4..cbf93b8 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -535,7 +535,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object>
{
           }
           if (isProjectionRequired[colCount]) {
             outputValues[projectionMap[colCount]] = queryTypes[colCount]
-                .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
+                .getDataBasedOnDataType(ByteBuffer.wrap(b));
           }
         } else {
           input.skipBytes(v);


Mime
View raw message