Repository: incubator-carbondata
Updated Branches:
refs/heads/master 5e0a07221 -> 151962afb
fix spark2 decimal
code clean
comment fix
comment fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/7f54160f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/7f54160f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/7f54160f
Branch: refs/heads/master
Commit: 7f54160f6ff6a584519121bff2536d3ed38c5026
Parents: 5e0a072
Author: wangfei <wangfei_hello@126.com>
Authored: Sat Dec 3 12:44:07 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sun Dec 4 02:10:41 2016 +0800
----------------------------------------------------------------------
.../datastorage/store/impl/FileFactory.java | 32 ++++++++++++++++++++
.../org/apache/spark/sql/CarbonSource.scala | 18 +++++------
.../org/apache/spark/sql/TableCreator.scala | 5 ++-
.../apache/spark/sql/hive/CarbonMetastore.scala | 14 +++++++--
.../carbondata/CarbonDataSourceSuite.scala | 15 +++++----
pom.xml | 6 ++--
6 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index a94d3f1..c540920 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -397,6 +397,38 @@ public final class FileFactory {
}
}
+ public static boolean deleteFile(String filePath, FileType fileType) throws IOException
{
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case ALLUXIO:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ return fs.delete(path, true);
+
+ case LOCAL:
+ default:
+ File file = new File(filePath);
+ return deleteAllFilesOfDir(file);
+ }
+ }
+
+ public static boolean deleteAllFilesOfDir(File path) {
+ if (!path.exists()) {
+ return true;
+ }
+ if (path.isFile()) {
+ return path.delete();
+ }
+ File[] files = path.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ deleteAllFilesOfDir(files[i]);
+ }
+ return path.delete();
+ }
+
+
public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
filePath = filePath.replace("\\", "/");
switch (fileType) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fb87ba2..b14a95c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
import org.apache.spark.sql.execution.command.{CreateTable, Field}
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
@@ -114,20 +114,18 @@ class CarbonSource extends CreatableRelationProvider
} catch {
case ex: NoSuchTableException =>
val fields = dataSchema.map { col =>
- val column = col.name
val dataType = Option(col.dataType.toString)
- val name = Option(col.name)
// This is to parse complex data types
- val x = col.name + ' ' + col.dataType
- val f: Field = Field(column, dataType, name, None, null)
+ val f: Field = Field(col.name, dataType, Option(col.name), None, null)
// the data type of the decimal type will be like decimal(10,0)
// so checking the start of the string and taking the precision and scale.
// resetting the data type with decimal
- if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = TableCreator.getScaleAndPrecision(col.dataType.toString)
- f.precision = precision
- f.scale = scale
- f.dataType = Some("decimal")
+ Option(col.dataType).foreach {
+ case d: DecimalType =>
+ f.precision = d.precision
+ f.scale = d.scale
+ f.dataType = Some("decimal")
+ case _ => // do nothing
}
f
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 14decdb..e375710 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -336,7 +336,7 @@ object TableCreator {
private def normalizeType(field: Field): Field = {
val dataType = field.dataType.getOrElse("NIL")
- dataType match {
+ dataType.toLowerCase match {
case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
field.storeType
)
@@ -367,8 +367,7 @@ object TableCreator {
field.storeType
)
case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
- field.storeType, field.precision, field.scale
- )
+ field.storeType, field.schemaOrdinal, field.precision, field.scale)
// checking if the nested data type contains the child type as decimal(10,0),
// if it is present then extracting the precision and scale. resetting the data type
// with Decimal.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 2fde552..c2da5c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -100,7 +100,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends Logging {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -125,6 +125,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
tableCreationTime
}
+ def cleanStore(): Unit = {
+ try {
+ val fileType = FileFactory.getFileType(storePath)
+ FileFactory.deleteFile(storePath, fileType)
+ } catch {
+ case e => LOGGER.error(e, "clean store failed")
+ }
+ }
+
def lookupRelation(dbName: Option[String],
tableName: String)(sparkSession: SparkSession): LogicalPlan = {
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
@@ -309,7 +318,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
metadata.tablesMeta += tableMeta
- logInfo(s"Table $tableName for Database $dbName created successfully.")
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
carbonTablePath.getPath
@@ -411,7 +419,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends
Loggin
.removeTable(dbName + "_" + tableName)
updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
case None =>
- logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+ LOGGER.info(s"Metadata does not contain entry for table $tableName in database
$dbName")
}
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index a635d72..aaa0a20 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.carbondata
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
@@ -34,10 +34,11 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
- // Drop table
- spark.sql("DROP TABLE IF EXISTS carbon_table")
- spark.sql("DROP TABLE IF EXISTS csv_table")
+ CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.get.carbonMetastore.cleanStore()
+ // Drop table
+ spark.sql("DROP TABLE IF EXISTS carbon_testtable")
// Create table
spark.sql(
s"""
@@ -46,7 +47,8 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
| intField int,
| bigintField long,
| doubleField double,
- | stringField string
+ | stringField string,
+ | decimalField decimal(13, 0)
| )
| USING org.apache.spark.sql.CarbonSource
""".stripMargin)
@@ -64,7 +66,8 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
test("agg") {
- spark.sql("select stringField, sum(intField) from carbon_testtable group by stringField").collect()
+ spark.sql("select stringField, sum(intField) , sum(decimalField) " +
+ "from carbon_testtable group by stringField").collect()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f54160f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ca28bbf..7c597d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -309,9 +309,6 @@
</profile>
<profile>
<id>spark-1.5</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
<properties>
<spark.version>1.5.2</spark.version>
<scala.binary.version>2.10</scala.binary.version>
@@ -338,6 +335,9 @@
</profile>
<profile>
<id>spark-2.0</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
<properties>
<spark.version>2.0.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
|