carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: fix spark2 decimal
Date Sat, 03 Dec 2016 18:12:22 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 5e0a07221 -> 151962afb


fix spark2 decimal

code clean

comment fix

comment fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/7f54160f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/7f54160f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/7f54160f

Branch: refs/heads/master
Commit: 7f54160f6ff6a584519121bff2536d3ed38c5026
Parents: 5e0a072
Author: wangfei <wangfei_hello@126.com>
Authored: Sat Dec 3 12:44:07 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sun Dec 4 02:10:41 2016 +0800

----------------------------------------------------------------------
 .../datastorage/store/impl/FileFactory.java     | 32 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     | 18 +++++------
 .../org/apache/spark/sql/TableCreator.scala     |  5 ++-
 .../apache/spark/sql/hive/CarbonMetastore.scala | 14 +++++++--
 .../carbondata/CarbonDataSourceSuite.scala      | 15 +++++----
 pom.xml                                         |  6 ++--
 6 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index a94d3f1..c540920 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -397,6 +397,38 @@ public final class FileFactory {
     }
   }
 
+  public static boolean deleteFile(String filePath, FileType fileType) throws IOException
{
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.delete(path, true);
+
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return deleteAllFilesOfDir(file);
+    }
+  }
+
+  public static boolean deleteAllFilesOfDir(File path) {
+    if (!path.exists()) {
+      return true;
+    }
+    if (path.isFile()) {
+      return path.delete();
+    }
+    File[] files = path.listFiles();
+    for (int i = 0; i < files.length; i++) {
+      deleteAllFilesOfDir(files[i]);
+    }
+    return path.delete();
+  }
+
+
   public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fb87ba2..b14a95c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.command.{CreateTable, Field}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DecimalType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
@@ -114,20 +114,18 @@ class CarbonSource extends CreatableRelationProvider
     } catch {
       case ex: NoSuchTableException =>
         val fields = dataSchema.map { col =>
-          val column = col.name
           val dataType = Option(col.dataType.toString)
-          val name = Option(col.name)
           // This is to parse complex data types
-          val x = col.name + ' ' + col.dataType
-          val f: Field = Field(column, dataType, name, None, null)
+          val f: Field = Field(col.name, dataType, Option(col.name), None, null)
           // the data type of the decimal type will be like decimal(10,0)
           // so checking the start of the string and taking the precision and scale.
           // resetting the data type with decimal
-          if (f.dataType.getOrElse("").startsWith("decimal")) {
-            val (precision, scale) = TableCreator.getScaleAndPrecision(col.dataType.toString)
-            f.precision = precision
-            f.scale = scale
-            f.dataType = Some("decimal")
+          Option(col.dataType).foreach {
+            case d: DecimalType =>
+              f.precision = d.precision
+              f.scale = d.scale
+              f.dataType = Some("decimal")
+            case _ => // do nothing
           }
           f
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 14decdb..e375710 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -336,7 +336,7 @@ object TableCreator {
 
   private def normalizeType(field: Field): Field = {
     val dataType = field.dataType.getOrElse("NIL")
-    dataType match {
+    dataType.toLowerCase match {
       case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
         field.storeType
       )
@@ -367,8 +367,7 @@ object TableCreator {
         field.storeType
       )
       case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
-        field.storeType, field.precision, field.scale
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale)
       // checking if the nested data type contains the child type as decimal(10,0),
       // if it is present then extracting the precision and scale. resetting the data type
       // with Decimal.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 2fde552..c2da5c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -100,7 +100,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends Logging {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -125,6 +125,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
     tableCreationTime
   }
 
+  def cleanStore(): Unit = {
+    try {
+      val fileType = FileFactory.getFileType(storePath)
+      FileFactory.deleteFile(storePath, fileType)
+    } catch {
+      case e => LOGGER.error(e, "clean store failed")
+    }
+  }
+
   def lookupRelation(dbName: Option[String],
                      tableName: String)(sparkSession: SparkSession): LogicalPlan = {
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
@@ -309,7 +318,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
     metadata.tablesMeta += tableMeta
-    logInfo(s"Table $tableName for Database $dbName created successfully.")
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
     updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
     carbonTablePath.getPath
@@ -411,7 +419,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
             .removeTable(dbName + "_" + tableName)
           updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
         case None =>
-          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+          LOGGER.info(s"Metadata does not contain entry for table $tableName in database
$dbName")
       }
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index a635d72..aaa0a20 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.carbondata
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
@@ -34,10 +34,11 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
       .getOrCreate()
     spark.sparkContext.setLogLevel("WARN")
 
-    // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbon_table")
-    spark.sql("DROP TABLE IF EXISTS csv_table")
+    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.get.carbonMetastore.cleanStore()
 
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_testtable")
     // Create table
     spark.sql(
       s"""
@@ -46,7 +47,8 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
          |    intField int,
          |    bigintField long,
          |    doubleField double,
-         |    stringField string
+         |    stringField string,
+         |    decimalField decimal(13, 0)
          | )
          | USING org.apache.spark.sql.CarbonSource
        """.stripMargin)
@@ -64,7 +66,8 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
 
 
   test("agg") {
-    spark.sql("select stringField, sum(intField) from carbon_testtable group by stringField").collect()
+    spark.sql("select stringField, sum(intField) , sum(decimalField) " +
+      "from carbon_testtable group by stringField").collect()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ca28bbf..7c597d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -309,9 +309,6 @@
     </profile>
     <profile>
       <id>spark-1.5</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
       <properties>
         <spark.version>1.5.2</spark.version>
         <scala.binary.version>2.10</scala.binary.version>
@@ -338,6 +335,9 @@
     </profile>
     <profile>
       <id>spark-2.0</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <properties>
         <spark.version>2.0.0</spark.version>
         <scala.binary.version>2.11</scala.binary.version>


Mime
View raw message