carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: do not reply on runnable command
Date Sun, 04 Dec 2016 15:20:18 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master bf9478640 -> e7958b61e


do not reply on runnable command

code clean

change spark version to 2.0.2

getclass.getname for logger

fix compile issue with spark2.0.2

loginfo storepath in carbonenv

revert pom change

fix testsuite

fix comment

remove no use imports


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3386a26b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3386a26b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3386a26b

Branch: refs/heads/master
Commit: 3386a26bd049ccf225c45f451f76fe064548408a
Parents: bf94786
Author: wangfei <wangfei_hello@126.com>
Authored: Sat Dec 3 20:40:39 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sun Dec 4 23:19:32 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonCatalystOperators.scala     |  57 -------
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   8 +-
 .../execution/command/carbonTableSchema.scala   | 157 +++----------------
 .../apache/spark/sql/hive/CarbonMetastore.scala |  18 ++-
 pom.xml                                         |   8 +-
 5 files changed, 43 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3386a26b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index c152e0c..88e43fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,54 +17,12 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.{TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
-import org.apache.spark.sql.types._
 
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
-/**
- * Top command
- */
-case class Top(count: Int, topOrBottom: Int, dim: NamedExpression, msr: NamedExpression,
-    child: LogicalPlan) extends UnaryNode {
-  def output: Seq[Attribute] = child.output
-
-  override def references: AttributeSet = {
-    val list = List(dim, msr)
-    AttributeSet(list.flatMap(_.references))
-  }
-}
-
-/**
- * Shows Loads in a table
- */
-case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
-  extends LogicalPlan with Command {
-
-  override def children: Seq[LogicalPlan] = Seq.empty
-
-  override def output: Seq[Attribute] = {
-    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
-      AttributeReference("Status", StringType, nullable = false)(),
-      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = false)())
-  }
-}
-
-/**
- * Describe formatted for hive table
- */
-case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
-  extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-
-  override def output: Seq[AttributeReference] =
-    Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
 case class CarbonDictionaryCatalystDecoder(
     relations: Seq[CarbonDecoderRelation],
     profile: CarbonProfile,
@@ -81,18 +39,3 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable
{
 case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class CreateDatabase(dbName: String, sql: String) extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = {
-    Seq()
-  }
-}
-
-case class DropDatabase(dbName: String, isCascade: Boolean, sql: String)
-    extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = {
-    Seq()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3386a26b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 8028908..73b988c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.hive.{CarbonMetastore, DistributionUtil}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 /**
@@ -29,7 +28,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
  */
 case class CarbonEnv(carbonMetastore: CarbonMetastore)
 
-object CarbonEnv extends Logging {
+object CarbonEnv {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   @volatile private var carbonEnv: CarbonEnv = _
 
@@ -40,6 +41,7 @@ object CarbonEnv extends Logging {
       val catalog = {
         val storePath = sqlContext.sparkSession.conf.get(
         CarbonCommonConstants.STORE_LOCATION, "/user/hive/warehouse/carbonstore")
+        LOGGER.info(s"carbon env initial: $storePath")
         new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
       }
       carbonEnv = CarbonEnv(catalog)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3386a26b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 135a5f6..52fc097 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -26,18 +26,14 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
-import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -56,8 +52,9 @@ import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil,
Globa
  *
  * @param alterTableModel
  */
-case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
-  RunnableCommand {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     // TODO : Implement it.
@@ -65,7 +62,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(databaseName + "_" + tableName)) {
-      logError(s"alter table failed. table not found: $databaseName.$tableName")
+      LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
       sys.error(s"alter table failed. table not found: $databaseName.$tableName")
     }
 
@@ -117,10 +114,10 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
   }
 }
 
-case class CreateTable(cm: TableModel) extends RunnableCommand {
+case class CreateTable(cm: TableModel) {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
     val tbName = cm.tableName
     val dbName = cm.databaseName
@@ -131,39 +128,10 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
     if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
       sys.error("No Dimensions found. Table should have at least one dimesnion !")
     }
-
-//    if (sparkSession.sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName)))
{
-//      if (!cm.ifNotExistsSet) {
-//        LOGGER.audit(
-//          s"Table creation with Database name [$dbName] and Table name [$tbName] failed.
" +
-//          s"Table [$tbName] already exists under database [$dbName]")
-//        sys.error(s"Table [$tbName] already exists under database [$dbName]")
-//      }
-//    } else {
-      // Add Database to catalog and persist
-      val catalog = CarbonEnv.get.carbonMetastore
-      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
-//      try {
-//        sparkSession.sql(
-//          s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
-//          s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
-//          .collect
-//      } catch {
-//        case e: Exception =>
-//          val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
-//          // call the drop table to delete the created table.
-//
-//          CarbonEnv.get.carbonMetastore
-//            .dropTable(catalog.storePath, identifier)(sparkSession)
-//
-//          LOGGER.audit(s"Table creation with Database name [$dbName] " +
-//                       s"and Table name [$tbName] failed")
-//          throw e
-//      }
-
-      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
-//    }
-
+    // Add Database to catalog and persist
+    val catalog = CarbonEnv.get.carbonMetastore
+    val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+    LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
     Seq.empty
   }
 
@@ -176,7 +144,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
 case class DeleteLoadsById(
     loadids: Seq[String],
     databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
+    tableName: String) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -239,7 +207,7 @@ case class DeleteLoadsByLoadDate(
     databaseNameOp: Option[String],
     tableName: String,
     dateField: String,
-    loadDate: String) extends RunnableCommand {
+    loadDate: String) {
 
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
 
@@ -292,8 +260,7 @@ case class DeleteLoadsByLoadDate(
 
 }
 
-case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation,
-                                          child: LogicalPlan) extends RunnableCommand {
+case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
{
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   def run(sparkSession: SparkSession): Seq[Row] = {
     val df = Dataset.ofRows(sparkSession, child)
@@ -322,7 +289,7 @@ case class LoadTable(
     options: scala.collection.immutable.Map[String, String],
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None) extends RunnableCommand {
+    dataFrame: Option[DataFrame] = None) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -335,7 +302,7 @@ case class LoadTable(
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
-      logError(s"Data loading failed. table not found: $dbName.$tableName")
+      LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
       LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
@@ -353,7 +320,7 @@ case class LoadTable(
       )
     try {
       if (carbonLock.lockWithRetries()) {
-        logInfo("Successfully able to get the table metadata file lock")
+        LOGGER.info("Successfully able to get the table metadata file lock")
       } else {
         sys.error("Table is locked for updation. Please try after some time")
       }
@@ -508,9 +475,9 @@ case class LoadTable(
     } finally {
       if (carbonLock != null) {
         if (carbonLock.unlock()) {
-          logInfo("Table MetaData Unlocked Successfully after data load")
+          LOGGER.info("Table MetaData Unlocked Successfully after data load")
         } else {
-          logError("Unable to unlock Table MetaData")
+          LOGGER.error("Unable to unlock Table MetaData")
         }
       }
     }
@@ -544,91 +511,11 @@ case class LoadTable(
   }
 }
 
-private[sql] case class DescribeCommandFormatted(
-    child: SparkPlan,
-    override val output: Seq[Attribute],
-    tblIdentifier: TableIdentifier)
-  extends RunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
-    val mapper = new ObjectMapper()
-    val colProps = StringBuilder.newBuilder
-    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
-      val comment = if (relation.metaData.dims.contains(field.name)) {
-        val dimension = relation.metaData.carbonTable.getDimensionByName(
-          relation.tableMeta.carbonTableIdentifier.getTableName,
-          field.name)
-        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size()
> 0) {
-          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
-          colProps.append(field.name).append(".")
-            .append(mapper.writeValueAsString(dimension.getColumnProperties))
-            .append(",")
-        }
-        if (dimension.hasEncoding(Encoding.DICTIONARY) &&
-            !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          "DICTIONARY, KEY COLUMN"
-        } else {
-          "KEY COLUMN"
-        }
-      } else {
-        ("MEASURE")
-      }
-      (field.name, field.dataType.simpleString, comment)
-    }
-    val colPropStr = if (colProps.toString().trim().length() > 0) {
-      // drops additional comma at end
-      colProps.toString().dropRight(1)
-    } else {
-      colProps.toString()
-    }
-    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
-      .getDatabaseName, "")
-    )
-    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName,
""))
-    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
-    val carbonTable = relation.tableMeta.carbonTable
-    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
-    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
-    if (colPropStr.length() > 0) {
-      results ++= Seq((colPropStr, "", ""))
-    } else {
-      results ++= Seq(("NONE", "", ""))
-    }
-    val dimension = carbonTable
-      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    results ++= getColumnGroups(dimension.asScala.toList)
-    results.map { case (name, dataType, comment) =>
-      Row(f"$name%-36s $dataType%-80s $comment%-72s")
-    }
-  }
-
-  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)]
= {
-    var results: Seq[(String, String, String)] =
-      Seq(("", "", ""), ("##Column Group Information", "", ""))
-    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
-      case (groupId, _) => groupId != -1
-    }.toSeq.sortBy(_._1)
-    val groups = groupedDimensions.map(colGroups => {
-      colGroups._2.map(dim => dim.getColName).mkString(", ")
-    })
-    var index = 1
-    groups.map { x =>
-      results = results :+ (s"Column Group $index", x, "")
-      index = index + 1
-    }
-    results
-  }
-}
-
 private[sql] case class DeleteLoadByDate(
     databaseNameOp: Option[String],
     tableName: String,
     dateField: String,
-    dateValue: String
-) extends RunnableCommand {
+    dateValue: String) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -674,7 +561,7 @@ private[sql] case class DeleteLoadByDate(
 
 case class CleanFiles(
     databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
+    tableName: String) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -716,9 +603,9 @@ case class ShowLoads(
     databaseNameOp: Option[String],
     tableName: String,
     limit: Option[String],
-    override val output: Seq[Attribute]) extends RunnableCommand {
+    val output: Seq[Attribute]) {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  def run(sparkSession: SparkSession): Seq[Row] = {
     val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
     val tableUniqueName = databaseName + "_" + tableName
     // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata
if

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3386a26b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index c2da5c6..24601f4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -56,7 +56,12 @@ import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
+case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+  // clear the metadata
+  def clear(): Unit = {
+    tablesMeta.clear()
+  }
+}
 
 case class CarbonMetaData(dims: Seq[String],
     msrs: Seq[String],
@@ -129,18 +134,19 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     try {
       val fileType = FileFactory.getFileType(storePath)
       FileFactory.deleteFile(storePath, fileType)
+      metadata.clear()
     } catch {
-      case e => LOGGER.error(e, "clean store failed")
+      case e: Throwable => LOGGER.error(e, "clean store failed")
     }
   }
 
-  def lookupRelation(dbName: Option[String],
-                     tableName: String)(sparkSession: SparkSession): LogicalPlan = {
+  def lookupRelation(dbName: Option[String], tableName: String)
+                    (sparkSession: SparkSession): LogicalPlan = {
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  def lookupRelation(tableIdentifier: TableIdentifier,
-                     alias: Option[String] = None)(sparkSession: SparkSession): LogicalPlan
= {
+  def lookupRelation(tableIdentifier: TableIdentifier, alias: Option[String] = None)
+                    (sparkSession: SparkSession): LogicalPlan = {
     checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
       sparkSession.catalog.currentDatabase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3386a26b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dac3ae2..09d940c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,9 @@
     </profile>
     <profile>
       <id>spark-1.5</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <properties>
         <spark.version>1.5.2</spark.version>
         <scala.binary.version>2.10</scala.binary.version>
@@ -334,11 +337,8 @@
     </profile>
     <profile>
       <id>spark-2.0</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
       <properties>
-        <spark.version>2.0.0</spark.version>
+        <spark.version>2.0.2</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>


Mime
View raw message