carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [20/22] incubator-carbondata git commit: corrected IUD test cases
Date Fri, 06 Jan 2017 13:57:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 805444f..1b4e795 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -41,16 +41,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.hdest""")
     sql("""CREATE TABLE iud.update_01(imei string,age int,task bigint,num double,level decimal(10,3),name
string)STORED BY 'org.apache.carbondata.format' """)
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/update01.csv' INTO TABLE iud.update_01
OPTIONS('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE') """)
-   /* CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")*/
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
   }
 
-  test("test update operation with 0 rows updation.") {
-  }
-
-/*
 
-  test("test update operation with 0 rows updation.") {
+  /*test("test update operation with 0 rows updation.") {
     sql("""drop table iud.zerorows""").show
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.zerorows""")
@@ -63,10 +59,10 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("""drop table iud.zerorows""").show
 
 
-  }
+  }*/
 
 
-    test("update carbon table[select from source table with where and exist]") {
+ /*   test("update carbon table[select from source table with where and exist]") {
       sql("""drop table iud.dest11""").show
       sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
       sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest11""")
@@ -77,8 +73,8 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
       )
       sql("""drop table iud.dest11""").show
    }
-
-  test("update carbon table[using destination table columns with where and exist]") {
+*/
+ /* test("update carbon table[using destination table columns with where and exist]") {
     sql("""drop table iud.dest22""").show
     sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest22""")
@@ -92,9 +88,9 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
       Seq(Row(2))
     )
     sql("""drop table iud.dest22""").show
-   }
+   }*/
 
-    test("update carbon table without alias in set columns") {
+/*    test("update carbon table without alias in set columns") {
       sql("""drop table iud.dest33""").show
       sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
       sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest33""")
@@ -104,7 +100,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
         Seq(Row("MGM","Disco"))
       )
       sql("""drop table iud.dest33""").show
-  }
+  }*/
 
    test("update carbon table without alias in set three columns") {
      sql("""drop table iud.dest44""").show
@@ -118,7 +114,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
      sql("""drop table iud.dest44""").show
    }
 
-    test("update carbon table[single column select from source with where and exist]") {
+  /*  test("update carbon table[single column select from source with where and exist]")
{
       sql("""drop table iud.dest55""").show
       sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
       sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest55""")
@@ -128,9 +124,9 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
         Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
       )
       sql("""drop table iud.dest55""").show
-   }
+   }*/
 
-  test("update carbon table[single column SELECT from source with where and exist]") {
+/*  test("update carbon table[single column SELECT from source with where and exist]") {
     sql("""drop table iud.dest55""").show
     sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest55""")
@@ -140,7 +136,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
       Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
     )
     sql("""drop table iud.dest55""").show
-  }
+  }*/
 
    test("update carbon table[using destination table columns without where clause]") {
      sql("""drop table iud.dest66""").show
@@ -213,7 +209,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
      )
      sql("""drop table iud.dest120""").show
    }
-   test("update carbon table[using destination table where and exist]") {
+/*   test("update carbon table[using destination table where and exist]") {
      sql("""drop table iud.dest130""").show
      sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
      sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest130""")
@@ -223,9 +219,9 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
        Seq(Row(2,"xyx"))
      )
      sql("""drop table iud.dest130""").show
-   }
+   }*/
 
-   test("update carbon table[using destination table (concat) where and exist]") {
+/*   test("update carbon table[using destination table (concat) where and exist]") {
      sql("""drop table iud.dest140""").show
      sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
      sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest140""")
@@ -235,7 +231,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
        Seq(Row(2,"aaaz"))
      )
      sql("""drop table iud.dest140""").show
-   }
+   }*/
 
    test("update carbon table[using destination table (concat) with  where") {
      sql("""drop table iud.dest150""").show
@@ -294,7 +290,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
      }
    }
 
-  test("""update carbon [special characters  in value- test parsing logic ]""") {
+ /* test("""update carbon [special characters  in value- test parsing logic ]""") {
     sql("""drop table iud.dest160""").show
     sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest160""")
@@ -310,8 +306,8 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where
d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3 and exists( select * from iud.other
o where o.c1 = d.c1)""").show()
     sql("""drop table iud.dest160""").show
   }
-
-  test("""update carbon [sub query, between and existing in outer condition.(Customer query
) ]""") {
+*/
+  /*test("""update carbon [sub query, between and existing in outer condition.(Customer query
) ]""") {
     sql("""drop table iud.dest170""").show
     sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
     sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud.dest170""")
@@ -321,7 +317,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
       Seq(Row("MGM"), Row("RGK"))
     )
     sql("""drop table iud.dest170""").show
-  }
+  }*/
 
   test("""update carbon [self join select query ]""") {
     sql("""drop table iud.dest171""").show
@@ -383,12 +379,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
       case ex: Exception => assert(true)
     }
   }
-*/
+
 
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
-  /*  CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")*/
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
b/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
index 6ae3d5c..40b8e4b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
@@ -53,6 +53,11 @@ public final class DataProcessorConstants {
    */
   public static final String YEAR = "YEAR";
 
+  /**
+   * if data load fails due to bad record
+   */
+  public static final long BAD_REC_FAILURE_ERROR_CODE = 223732674;
+
   private DataProcessorConstants() {
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index 11acd56..48efec8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,6 +22,7 @@ package org.apache.carbondata.processing.csvreaderstep;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -181,7 +182,10 @@ public class CsvInput extends BaseStep implements StepInterface {
       LOGGER.info("*****************Completed all csv reading***********");
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
               meta.getPartitionID(), System.currentTimeMillis());
-    } else {
+    } else if(rddIteratorKey.startsWith(CarbonCommonConstants.RDDUTIL_UPDATE_KEY)) {
+      scanRddIteratorForUpdate();
+    }
+    else {
       scanRddIterator(numberOfNodes);
     }
     setOutputDone();
@@ -220,7 +224,7 @@ public class CsvInput extends BaseStep implements StepInterface {
       }
       return null;
     }
-  }
+  };
 
   private void scanRddIterator(int numberOfNodes) throws RuntimeException {
     JavaRddIterator<JavaRddIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey);
@@ -255,6 +259,22 @@ public class CsvInput extends BaseStep implements StepInterface {
     }
   }
 
+  private void scanRddIteratorForUpdate() throws RuntimeException {
+    Iterator<String[]> iterator = RddInpututilsForUpdate.getAndRemove(rddIteratorKey);
+    if (iterator != null) {
+      try{
+        while(iterator.hasNext()){
+          putRow(data.outputRowMeta, iterator.next());
+        }
+      } catch (KettleException e) {
+        throw new RuntimeException(e);
+      } catch (Exception e) {
+        LOGGER.error(e, "Scan rdd during data load is terminated due to error.");
+        throw e;
+      }
+    }
+  }
+
   private void startProcess(final int numberOfNodes) throws RuntimeException {
     exec = Executors.newFixedThreadPool(numberOfNodes);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInpututilsForUpdate.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInpututilsForUpdate.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInpututilsForUpdate.java
new file mode 100644
index 0000000..3cac2f3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInpututilsForUpdate.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.csvreaderstep;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class RddInpututilsForUpdate {
+  private static Map<String, Iterator<String[]>> iteratorMap = new HashMap<String,
+      Iterator<String[]>>();
+
+  public static void put(String key, Iterator<String[]> value) {
+    iteratorMap.put(key, value);
+  }
+
+  public static Iterator<String[]> getAndRemove(String key) {
+    Iterator<String[]> iter = iteratorMap.get(key);
+    remove(key);
+    return iter;
+  }
+
+  public static void remove(String key) {
+    iteratorMap.remove(key);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
index 559f363..89c1228 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
@@ -489,8 +489,8 @@ public class MDKeyGenStepMeta extends BaseStepMeta implements StepMetaInterface
    *
    * @return
    */
-  public int getSegmentId() {
-    return Integer.parseInt(segmentId);
+  public String getSegmentId() {
+    return segmentId;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
index c2f0611..97a488f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStepMeta.java
@@ -455,8 +455,8 @@ public class SortKeyStepMeta extends BaseStepMeta implements StepMetaInterface
{
    *
    * @return
    */
-  public int getSegmentId() {
-    return Integer.parseInt(segmentId);
+  public String getSegmentId() {
+    return segmentId;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
index 73b919c..188c468 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
@@ -17,13 +17,8 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
 /**
  * This class contains attributes of file which are required to
@@ -66,25 +61,7 @@ public class CarbonDataFileAttributes {
    * @return fact time stamp which is load start time
    */
   public String getFactTimeStamp() {
-    return convertTimeStampToString(factTimeStamp);
+    return factTimeStamp;
   }
 
-  /**
-   * This method will convert a given timestamp to long value and then to string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  private String convertTimeStampToString(String factTimeStamp) {
-    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return Long.toString(dateToStr.getTime());
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp
-          + " to Time/Long type value" + e.getMessage());
-      return null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
index 61fb54d..20b69a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
@@ -1369,8 +1369,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements
StepMetaIn
    *
    * @return
    */
-  public int getSegmentId() {
-    return Integer.parseInt(segmentId);
+  public String getSegmentId() {
+    return segmentId;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7aa68005/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
index c92b11b..ee76f8f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
@@ -83,7 +83,7 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
   /**
    * load Id
    */
-  private int segmentId;
+  private String segmentId;
   /**
    * task id, each spark task has a unique id
    */


Mime
View raw message