http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 21864d1..16e35f4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -17,13 +17,9 @@
package org.apache.spark.sql
-import java.util.regex.{Matcher, Pattern}
-
import scala.collection.JavaConverters._
-import scala.collection.mutable.LinkedHashSet
import scala.collection.mutable.Map
import scala.language.implicitConversions
-import scala.util.matching.Regex
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.parse._
@@ -31,154 +27,18 @@ import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveQlWrapper
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
* Parser for All Carbon DDL, DML cases in Unified context
*/
-class CarbonSqlParser() extends AbstractSparkSQLParser {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- protected val AGGREGATE = carbonKeyWord("AGGREGATE")
- protected val AS = carbonKeyWord("AS")
- protected val AGGREGATION = carbonKeyWord("AGGREGATION")
- protected val ALL = carbonKeyWord("ALL")
- protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
- protected val BEFORE = carbonKeyWord("BEFORE")
- protected val BY = carbonKeyWord("BY")
- protected val CARDINALITY = carbonKeyWord("CARDINALITY")
- protected val CASCADE = carbonKeyWord("CASCADE")
- protected val CLASS = carbonKeyWord("CLASS")
- protected val CLEAN = carbonKeyWord("CLEAN")
- protected val COLS = carbonKeyWord("COLS")
- protected val COLUMNS = carbonKeyWord("COLUMNS")
- protected val CREATE = carbonKeyWord("CREATE")
- protected val CUBE = carbonKeyWord("CUBE")
- protected val CUBES = carbonKeyWord("CUBES")
- protected val DATA = carbonKeyWord("DATA")
- protected val DATABASE = carbonKeyWord("DATABASE")
- protected val DATABASES = carbonKeyWord("DATABASES")
- protected val DELETE = carbonKeyWord("DELETE")
- protected val DELIMITER = carbonKeyWord("DELIMITER")
- protected val DESCRIBE = carbonKeyWord("DESCRIBE")
- protected val DESC = carbonKeyWord("DESC")
- protected val DETAIL = carbonKeyWord("DETAIL")
- protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
- protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
- protected val DROP = carbonKeyWord("DROP")
- protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
- protected val EXCLUDE = carbonKeyWord("EXCLUDE")
- protected val EXPLAIN = carbonKeyWord("EXPLAIN")
- protected val EXTENDED = carbonKeyWord("EXTENDED")
- protected val FORMATTED = carbonKeyWord("FORMATTED")
- protected val FACT = carbonKeyWord("FACT")
- protected val FIELDS = carbonKeyWord("FIELDS")
- protected val FILEHEADER = carbonKeyWord("FILEHEADER")
- protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
- protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
- protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
- protected val FILES = carbonKeyWord("FILES")
- protected val FROM = carbonKeyWord("FROM")
- protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
- protected val IN = carbonKeyWord("IN")
- protected val INCLUDE = carbonKeyWord("INCLUDE")
- protected val INPATH = carbonKeyWord("INPATH")
- protected val INTO = carbonKeyWord("INTO")
- protected val LEVELS = carbonKeyWord("LEVELS")
- protected val LIKE = carbonKeyWord("LIKE")
- protected val LOAD = carbonKeyWord("LOAD")
- protected val LOCAL = carbonKeyWord("LOCAL")
- protected val MAPPED = carbonKeyWord("MAPPED")
- protected val MEASURES = carbonKeyWord("MEASURES")
- protected val MULTILINE = carbonKeyWord("MULTILINE")
- protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
- protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
- protected val OPTIONS = carbonKeyWord("OPTIONS")
- protected val OUTPATH = carbonKeyWord("OUTPATH")
- protected val OVERWRITE = carbonKeyWord("OVERWRITE")
- protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
- protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
- protected val PARTITIONER = carbonKeyWord("PARTITIONER")
- protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
- protected val RELATION = carbonKeyWord("RELATION")
- protected val SCHEMA = carbonKeyWord("SCHEMA")
- protected val SCHEMAS = carbonKeyWord("SCHEMAS")
- protected val SHOW = carbonKeyWord("SHOW")
- protected val TABLES = carbonKeyWord("TABLES")
- protected val TABLE = carbonKeyWord("TABLE")
- protected val TERMINATED = carbonKeyWord("TERMINATED")
- protected val TYPE = carbonKeyWord("TYPE")
- protected val USE = carbonKeyWord("USE")
- protected val WHERE = carbonKeyWord("WHERE")
- protected val WITH = carbonKeyWord("WITH")
- protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
- protected val ABS = carbonKeyWord("abs")
-
- protected val FOR = carbonKeyWord("FOR")
- protected val SCRIPTS = carbonKeyWord("SCRIPTS")
- protected val USING = carbonKeyWord("USING")
- protected val LIMIT = carbonKeyWord("LIMIT")
- protected val DEFAULTS = carbonKeyWord("DEFAULTS")
- protected val ALTER = carbonKeyWord("ALTER")
- protected val ADD = carbonKeyWord("ADD")
-
- protected val IF = carbonKeyWord("IF")
- protected val NOT = carbonKeyWord("NOT")
- protected val EXISTS = carbonKeyWord("EXISTS")
- protected val DIMENSION = carbonKeyWord("DIMENSION")
- protected val STARTTIME = carbonKeyWord("STARTTIME")
- protected val SEGMENTS = carbonKeyWord("SEGMENTS")
- protected val SEGMENT = carbonKeyWord("SEGMENT")
-
- protected val STRING = carbonKeyWord("STRING")
- protected val INTEGER = carbonKeyWord("INTEGER")
- protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
- protected val DATE = carbonKeyWord("DATE")
- protected val CHAR = carbonKeyWord("CHAR")
- protected val NUMERIC = carbonKeyWord("NUMERIC")
- protected val DECIMAL = carbonKeyWord("DECIMAL")
- protected val DOUBLE = carbonKeyWord("DOUBLE")
- protected val SHORT = carbonKeyWord("SMALLINT")
- protected val INT = carbonKeyWord("INT")
- protected val BIGINT = carbonKeyWord("BIGINT")
- protected val ARRAY = carbonKeyWord("ARRAY")
- protected val STRUCT = carbonKeyWord("STRUCT")
-
- protected val doubleQuotedString = "\"([^\"]+)\"".r
- protected val singleQuotedString = "'([^']+)'".r
-
- protected val newReservedWords =
- this.getClass
- .getMethods
- .filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword].str)
-
- override val lexical = {
- val sqllex = new SqlLexical()
- sqllex.initialize(newReservedWords)
- sqllex
-
- }
-
- import lexical.Identifier
-
- implicit def regexToParser(regex: Regex): Parser[String] = {
- acceptMatch(
- s"identifier matching regex ${ regex }",
- { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
- )
- }
+class CarbonSqlParser() extends CarbonDDLSqlParser {
override def parse(input: String): LogicalPlan = {
synchronized {
@@ -196,26 +56,14 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- /**
- * This will convert key word to regular expression.
- *
- * @param keys
- * @return
- */
- private def carbonKeyWord(keys: String) = {
- ("(?i)" + keys).r
- }
-
override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
- protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
- loadManagement | describeTable |
- showLoads | alterTable | createTable
-
- protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
- cleanFiles | loadDataNew
+ protected lazy val startCommand: Parser[LogicalPlan] =
+ createDatabase | dropDatabase | loadManagement | describeTable |
+ showLoads | alterTable | createTable
- protected val escapedIdentifier = "`([^`]+)`".r
+ protected lazy val loadManagement: Parser[LogicalPlan] =
+ deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
protected lazy val createDatabase: Parser[LogicalPlan] =
CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
@@ -253,19 +101,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
DropDatabase(dbName, isCascade, dropDbSql)
}
- private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
- var complexDimensions: Seq[Field] = Seq()
- var dimensions: Seq[Field] = Seq()
- dims.foreach { dimension =>
- dimension.dataType.getOrElse("NIL") match {
- case "Array" => complexDimensions = complexDimensions :+ dimension
- case "Struct" => complexDimensions = complexDimensions :+ dimension
- case _ => dimensions = dimensions :+ dimension
- }
- }
- dimensions ++ complexDimensions
- }
-
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> restInput ^^ {
case statement =>
@@ -303,14 +138,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- private def getScaleAndPrecision(dataType: String): (Int, Int) = {
- val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
- m.find()
- val matchedString: String = m.group(1)
- val scaleAndPrecision = matchedString.split(",")
- (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
- }
-
/**
* This function will traverse the tree and logical plan will be formed using that.
*
@@ -386,6 +213,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
if(f.dataType.getOrElse("").startsWith("char")) {
f.dataType = Some("char")
}
+ f.rawSchema = x
fields ++= Seq(f)
}
}
@@ -494,511 +322,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- /**
- * This will prepate the Model from the Tree details.
- *
- * @param ifNotExistPresent
- * @param dbName
- * @param tableName
- * @param fields
- * @param partitionCols
- * @param tableProperties
- * @return
- */
- protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
- , tableName: String, fields: Seq[Field],
- partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]): TableModel
- = {
-
- fields.zipWithIndex.foreach { x =>
- x._1.schemaOrdinal = x._2
- }
- val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
- fields, tableProperties)
- if (dims.isEmpty) {
- throw new MalformedCarbonCommandException(s"Table ${
- dbName.getOrElse(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME)
- }.$tableName"
- +
- " can not be created without key columns. Please " +
- "use DICTIONARY_INCLUDE or " +
- "DICTIONARY_EXCLUDE to set at least one key " +
- "column " +
- "if all specified columns are numeric types")
- }
- val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
-
- // column properties
- val colProps = extractColumnProperties(fields, tableProperties)
- // get column groups configuration from table properties.
- val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
- noDictionaryDims, msrs, dims)
-
- // get no inverted index columns from table properties.
- val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
-
- // validate the tableBlockSize from table properties
- CommonUtil.validateTableBlockSize(tableProperties)
-
- TableModel(
- ifNotExistPresent,
- dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
- dbName,
- tableName,
- tableProperties,
- reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
- msrs.map(f => normalizeType(f)),
- Option(noDictionaryDims),
- Option(noInvertedIdxCols),
- groupCols,
- Some(colProps))
- }
-
- /**
- * Extract the column groups configuration from table properties.
- * Based on this Row groups of fields will be determined.
- *
- * @param tableProperties
- * @return
- */
- protected def updateColumnGroupsInField(tableProperties: Map[String, String],
- noDictionaryDims: Seq[String],
- msrs: Seq[Field],
- dims: Seq[Field]): Seq[String] = {
- if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
-
- var splittedColGrps: Seq[String] = Seq[String]()
- val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
-
- // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
- // here first splitting the value by () . so that the above will be splitted into 2 strings.
- // [col1,col2] [col3,col4]
- val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
- while (m.find()) {
- val oneGroup: String = m.group(1)
- CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
- val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
- splittedColGrps :+= arrangedColGrp
- }
- // This will be furthur handled.
- CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
- } else {
- null
- }
- }
-
- def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
- // if columns in column group is not in schema order than arrange it in schema order
- var colGrpFieldIndx: Seq[Int] = Seq[Int]()
- colGroup.split(',').map(_.trim).foreach { x =>
- dims.zipWithIndex.foreach { dim =>
- if (dim._1.column.equalsIgnoreCase(x)) {
- colGrpFieldIndx :+= dim._2
- }
- }
- }
- // sort it
- colGrpFieldIndx = colGrpFieldIndx.sorted
- // check if columns in column group is in schema order
- if (!checkIfInSequence(colGrpFieldIndx)) {
- throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
- }
- def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
- for (i <- 0 until (colGrpFieldIndx.length - 1)) {
- if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
- throw new MalformedCarbonCommandException(
- "Invalid column group,column in group should be contiguous as per schema.")
- }
- }
- true
- }
- val colGrpNames: StringBuilder = StringBuilder.newBuilder
- for (i <- colGrpFieldIndx.indices) {
- colGrpNames.append(dims(colGrpFieldIndx(i)).column)
- if (i < (colGrpFieldIndx.length - 1)) {
- colGrpNames.append(",")
- }
- }
- colGrpNames.toString()
- }
-
- /**
- * For getting the partitioner Object
- *
- * @param partitionCols
- * @param tableProperties
- * @return
- */
- protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]):
- Option[Partitioner] = {
-
- // by default setting partition class empty.
- // later in table schema it is setting to default value.
- var partitionClass: String = ""
- var partitionCount: Int = 1
- var partitionColNames: Array[String] = Array[String]()
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
- partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
- }
-
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
- try {
- partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
- } catch {
- case e: Exception => // no need to do anything.
- }
- }
-
- partitionCols.foreach(col =>
- partitionColNames :+= col.partitionColumn
- )
-
- // this means user has given partition cols list
- if (!partitionColNames.isEmpty) {
- return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
- }
- // if partition cols are not given then no need to do partition.
- None
- }
-
- protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
- java.util.Map[String, java.util.List[ColumnProperty]] = {
- val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
- fields.foreach { field =>
- if (field.children.isDefined && field.children.get != null) {
- fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
- } else {
- fillColumnProperty(None, field.column, tableProperties, colPropMap)
- }
- }
- colPropMap
- }
-
- protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
- fieldChildren.foreach(fields => {
- fields.foreach(field => {
- fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
- }
- )
- }
- )
- }
-
- protected def fillColumnProperty(parentColumnName: Option[String],
- columnName: String,
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
- val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
- val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
- if (colProps.isDefined) {
- colPropMap.put(colProKey, colProps.get)
- }
- }
-
- def getKey(parentColumnName: Option[String],
- columnName: String): (String, String) = {
- if (parentColumnName.isDefined) {
- if (columnName == "val") {
- (parentColumnName.get, parentColumnName.get + "." + columnName)
- } else {
- (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
- }
- } else {
- (columnName, columnName)
- }
- }
-
- /**
- * This will extract the no inverted columns fields.
- * By default all dimensions use inverted index.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractNoInvertedIndexColumns(fields: Seq[Field],
- tableProperties: Map[String, String]):
- Seq[String] = {
- // check whether the column name is in fields
- var noInvertedIdxColsProps: Array[String] = Array[String]()
- var noInvertedIdxCols: Seq[String] = Seq[String]()
-
- if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
- noInvertedIdxColsProps =
- tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
- noInvertedIdxColsProps
- .map { noInvertedIdxColProp =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
- val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
- }
- // check duplicate columns and only 1 col left
- val distinctCols = noInvertedIdxColsProps.toSet
- // extract the no inverted index columns
- fields.foreach(field => {
- if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
- noInvertedIdxCols :+= field.column
- }
- }
- )
- noInvertedIdxCols
- }
-
- /**
- * This will extract the Dimensions and NoDictionary Dimensions fields.
- * By default all string cols are dimensions.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
- tableProperties: Map[String, String]):
- (Seq[Field], Seq[String]) = {
- var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
- var dictExcludeCols: Array[String] = Array[String]()
- var noDictionaryDims: Seq[String] = Seq[String]()
- var dictIncludeCols: Seq[String] = Seq[String]()
-
- // All excluded cols should be there in create table cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
- dictExcludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
- dictExcludeCols
- .map { dictExcludeCol =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
- val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- } else {
- val dataType = fields.find(x =>
- x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
- if (isComplexDimDictionaryExclude(dataType)) {
- val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
- dictExcludeCol
- throw new MalformedCarbonCommandException(errormsg)
- } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
- val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
- " data type column: " + dictExcludeCol
- throw new MalformedCarbonCommandException(errorMsg)
- }
- }
- }
- }
- // All included cols should be there in create table cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
- dictIncludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
- dictIncludeCols.map { distIncludeCol =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
- val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
- }
-
- // include cols should contain exclude cols
- dictExcludeCols.foreach { dicExcludeCol =>
- if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
- val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
- " with DICTIONARY_INCLUDE. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
-
- // by default consider all String cols as dims and if any dictionary exclude is present then
- // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
- fields.foreach(field => {
-
- if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
- val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
- if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
- noDictionaryDims :+= field.column
- }
- dimFields += field
- } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
- dimFields += (field)
- } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
- dimFields += (field)
- }
- }
- )
-
- (dimFields.toSeq, noDictionaryDims)
- }
-
- /**
- * It fills non string dimensions in dimFields
- */
- def fillNonStringDimension(dictIncludeCols: Seq[String],
- field: Field, dimFields: LinkedHashSet[Field]) {
- var dictInclude = false
- if (dictIncludeCols.nonEmpty) {
- dictIncludeCols.foreach(dictIncludeCol =>
- if (field.column.equalsIgnoreCase(dictIncludeCol)) {
- dictInclude = true
- })
- }
- if (dictInclude) {
- dimFields += field
- }
- }
-
- /**
- * detect dimention data type
- *
- * @param dimensionDatatype
- */
- def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
- val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
- dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
- }
-
- /**
- * detects whether complex dimension is part of dictionary_exclude
- */
- def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
- val dimensionType = Array("array", "struct")
- dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
- }
-
- /**
- * detects whether double or decimal column is part of dictionary_exclude
- */
- def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
- val dataTypes = Array("string", "timestamp")
- dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
- }
-
- /**
- * Extract the Measure Cols fields. By default all non string cols will be measures.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractMsrColsFromFields(fields: Seq[Field],
- tableProperties: Map[String, String]): Seq[Field] = {
- var msrFields: Seq[Field] = Seq[Field]()
- var dictIncludedCols: Array[String] = Array[String]()
- var dictExcludedCols: Array[String] = Array[String]()
-
- // get all included cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
- dictIncludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
- }
-
- // get all excluded cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
- dictExcludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
- }
-
- // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
- fields.foreach(field => {
- if (!isDetectAsDimentionDatatype(field.dataType.get)) {
- if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
- !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
- msrFields :+= field
- }
- }
- })
-
- msrFields
- }
-
- /**
- * Extract the DbName and table name.
- *
- * @param tableNameParts
- * @return
- */
- protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
- val (db, tableName) =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
- }
-
- (db, tableName)
- }
-
- protected def cleanIdentifier(ident: String): String = {
- ident match {
- case escapedIdentifier(i) => i
- case plainIdent => plainIdent
- }
- }
-
- protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
- var remainingNodes = nodeList
- val clauses = clauseNames.map { clauseName =>
- val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
- remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
- matches.tail
- } else {
- Nil
- })
- matches.headOption
- }
-
- if (remainingNodes.nonEmpty) {
- sys.error(
- s"""Unhandled clauses:
- |You are likely trying to use an unsupported carbon feature."""".stripMargin)
- }
- clauses
- }
-
- object Token {
- /** @return matches of the form (tokenName, children). */
- def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
- t match {
- case t: ASTNode =>
- CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
- Some((t.getText,
- Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
- case _ => None
- }
- }
- }
-
- /**
- * Extract the table properties token
- *
- * @param node
- * @return
- */
- protected def getProperties(node: Node): Seq[(String, String)] = {
- node match {
- case Token("TOK_TABLEPROPLIST", list) =>
- list.map {
- case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- (unquoteString(key) -> unquoteString(value))
- }
- }
- }
-
- protected def unquoteString(str: String) = {
- str match {
- case singleQuotedString(s) => s.toLowerCase()
- case doubleQuotedString(s) => s.toLowerCase()
- case other => other
- }
- }
-
protected lazy val loadDataNew: Parser[LogicalPlan] =
LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
(INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
@@ -1015,143 +338,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
isOverwrite.isDefined)
}
- private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
-
- // validate with all supported options
- val options = optionList.get.groupBy(x => x._1)
- val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
- "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
- "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
- "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
- )
- var isSupported = true
- val invalidOptions = StringBuilder.newBuilder
- options.foreach(value => {
- if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
- isSupported = false
- invalidOptions.append(value._1)
- }
-
- }
- )
- if (!isSupported) {
- val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- // COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
- if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
- options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
- val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
- " in options"
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
- val maxColumns: String = options.get("maxcolumns").get(0)._2
- try {
- maxColumns.toInt
- } catch {
- case ex: NumberFormatException =>
- throw new MalformedCarbonCommandException(
- "option MAXCOLUMNS can only contain integer values")
- }
- }
-
- // check for duplicate options
- val duplicateOptions = options filter {
- case (_, optionlist) => optionlist.size > 1
- }
- val duplicates = StringBuilder.newBuilder
- if (duplicateOptions.nonEmpty) {
- duplicateOptions.foreach(x => {
- duplicates.append(x._1)
- }
- )
- val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
- throw new MalformedCarbonCommandException(errorMessage)
- }
- }
-
- protected lazy val dbTableIdentifier: Parser[Seq[String]] =
- (ident <~ ".").? ~ (ident) ^^ {
- case databaseName ~ tableName =>
- if (databaseName.isDefined) {
- Seq(databaseName.get, tableName)
- } else {
- Seq(tableName)
- }
- }
-
- protected lazy val loadOptions: Parser[(String, String)] =
- (stringLit <~ "=") ~ stringLit ^^ {
- case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
- case _ => ("", "")
- }
-
-
- protected lazy val dimCol: Parser[Field] = anyFieldDef
-
- protected lazy val primitiveTypes =
- STRING ^^^ "string" | INTEGER ^^^ "integer" |
- TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
- BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
- INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
-
- /**
- * Matching the decimal(10,0) data type and returning the same.
- */
- private lazy val charType =
- CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
- case char ~ digit =>
- s"$char($digit)"
- }
-
- /**
- * Matching the decimal(10,0) data type and returning the same.
- */
- private lazy val decimalType =
- DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
- case decimal ~ precision ~ scale =>
- s"$decimal($precision, $scale)"
- }
-
- protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
- primitiveFieldType
-
- protected lazy val anyFieldDef: Parser[Field] =
- (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
- case e1 ~ e2 ~ e3 =>
- Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
- }
-
- protected lazy val primitiveFieldType: Parser[Field] =
- (primitiveTypes) ^^ {
- case e1 =>
- Field("unknown", Some(e1), Some("unknown"), Some(null))
- }
-
- protected lazy val arrayFieldType: Parser[Field] =
- ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
- case e1 =>
- Field("unknown", Some("array"), Some("unknown"),
- Some(List(Field("val", e1.dataType, Some("val"),
- e1.children))))
- }
-
- protected lazy val structFieldType: Parser[Field] =
- ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
- case e1 =>
- Field("unknown", Some("struct"), Some("unknown"), Some(e1))
- }
-
- protected lazy val measureCol: Parser[Field] =
- (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
- BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
- (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
- case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
- }
-
protected lazy val describeTable: Parser[LogicalPlan] =
((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
case ef ~ db ~ tbl =>
@@ -1169,109 +355,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- private def normalizeType(field: Field): Field = {
- val dataType = field.dataType.getOrElse("NIL")
- dataType match {
- case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "smallint" => Field(field.column, Some("SmallInt"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "array" => Field(field.column, Some("Array"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "struct" => Field(field.column, Some("Struct"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal, field.precision, field.scale
- )
- // checking if the nested data type contains the child type as decimal(10,0),
- // if it is present then extracting the precision and scale. resetting the data type
- // with Decimal.
- case _ if (dataType.startsWith("decimal")) =>
- val (precision, scale) = getScaleAndPrecision(dataType)
- Field(field.column,
- Some("Decimal"),
- field.name,
- Some(null),
- field.parent,
- field.storeType, field.schemaOrdinal, precision,
- scale
- )
- case _ =>
- field
- }
- }
-
- private def addParent(field: Field): Field = {
- field.dataType.getOrElse("NIL") match {
- case "Array" => Field(field.column, Some("Array"), field.name,
- field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
- field.storeType, field.schemaOrdinal)
- case "Struct" => Field(field.column, Some("Struct"), field.name,
- field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
- field.storeType, field.schemaOrdinal)
- case _ => field
- }
- }
-
- private def appendParentForEachChild(field: Field, parentName: String): Field = {
- field.dataType.getOrElse("NIL") match {
- case "String" => Field(parentName + "." + field.column, Some("String"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Long" => Field(parentName + "." + field.column, Some("Long"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Double" => Field(parentName + "." + field.column, Some("Double"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Array" => Field(parentName + "." + field.column, Some("Array"),
- Some(parentName + "." + field.name.getOrElse(None)),
- field.children
- .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
- parentName)
- case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
- Some(parentName + "." + field.name.getOrElse(None)),
- field.children
- .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
- parentName)
- case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
- field.storeType, field.schemaOrdinal, field.precision, field.scale)
- case _ => field
- }
- }
-
protected lazy val showLoads: Parser[LogicalPlan] =
SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
(LIMIT ~> numericLit).? <~
@@ -1280,13 +363,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
}
- protected lazy val segmentId: Parser[String] =
- numericLit ^^ { u => u } |
- elem("decimal", p => {
- p.getClass.getSimpleName.equals("FloatLit") ||
- p.getClass.getSimpleName.equals("DecimalLit")
- }) ^^ (_.chars)
-
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
(ident <~ ".").? ~ ident) <~
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d82290e..8ad1203 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -139,7 +139,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
- val tableInfo: TableInfo = TableNewProcessor(cm, sqlContext)
+ val tableInfo: TableInfo = TableNewProcessor(cm)
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 88e43fd..2943a52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types.{StringType, TimestampType}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
@@ -39,3 +42,34 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
+object getDB {
+
+ def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
+ dbName.getOrElse(
+ sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
+ }
+}
+
+/**
+ * Shows Loads in a table
+ */
+case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
+ extends Command {
+
+ override def output: Seq[Attribute] = {
+ Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+ AttributeReference("Status", StringType, nullable = false)(),
+ AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+ AttributeReference("Load End Time", TimestampType, nullable = false)())
+ }
+}
+
+/**
+ * Describe formatted for hive table
+ */
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends Command {
+
+ override def output: Seq[AttributeReference] =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1fa710e..976a1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -41,13 +41,13 @@ object CarbonEnv {
var initialized = false
- def init(sqlContext: SQLContext): Unit = {
+ def init(sparkSession: SparkSession): Unit = {
if (!initialized) {
val catalog = {
val storePath =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
LOGGER.info(s"carbon env initial: $storePath")
- new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+ new CarbonMetastore(sparkSession.conf, storePath)
}
carbonEnv = CarbonEnv(catalog)
initialized = true
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
new file mode 100644
index 0000000..748d292
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.internal.SessionState
+
+/**
+ * Session implementation for {org.apache.spark.sql.SparkSession}
+ * Implemented this class only to use our own SQL DDL commands.
+ * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
+ * @param sc
+ */
+class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+
+ CarbonEnv.init(this)
+
+ @transient
+ override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+}
+
+object CarbonSession {
+
+ implicit class CarbonBuilder(builder: Builder) {
+
+
+ def getOrCreateCarbonSession(): SparkSession = synchronized {
+
+ val options =
+ getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+ val userSuppliedContext: Option[SparkContext] =
+ getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]]
+
+ // Get the session from current thread's active session.
+ var session: SparkSession = SparkSession.getActiveSession match {
+ case Some(session) =>
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ session
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+
+ // Global synchronization so we will only set the default session once.
+ SparkSession.synchronized {
+ // If the current thread does not have an active session, get it from the global session.
+ session = SparkSession.getDefaultSession match {
+ case Some(session) =>
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ session
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+
+ // No active nor global default session. Create a new one.
+ val sparkContext = userSuppliedContext.getOrElse {
+ // set app name if not given
+ val randomAppName = java.util.UUID.randomUUID().toString
+ val sparkConf = new SparkConf()
+ options.foreach { case (k, v) => sparkConf.set(k, v) }
+ if (!sparkConf.contains("spark.app.name")) {
+ sparkConf.setAppName(randomAppName)
+ }
+ val sc = SparkContext.getOrCreate(sparkConf)
+ // maybe this is an existing SparkContext, update its SparkConf which maybe used
+ // by SparkSession
+ options.foreach { case (k, v) => sc.conf.set(k, v) }
+ if (!sc.conf.contains("spark.app.name")) {
+ sc.conf.setAppName(randomAppName)
+ }
+ sc
+ }
+ session = new CarbonSession(sparkContext)
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ SparkSession.setDefaultSession(session)
+
+ // Register a successfully instantiated context to the singleton. This should be at the
+ // end of the class definition so that the singleton is updated only if there is no
+ // exception in the construction of the instance.
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ return session
+ }
+
+ /**
+ * It is a hack to get the private field from class.
+ */
+ def getValue(name: String, builder: Builder): Any = {
+ val currentMirror = scala.reflect.runtime.currentMirror
+ val instanceMirror = currentMirror.reflect(builder)
+ val m = currentMirror.classSymbol(builder.getClass).
+ toType.members.find { p =>
+ p.name.toString.equals(name)
+ }.get.asTerm
+ instanceMirror.reflectField(m).get
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b639ea8..c78ddf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -48,7 +48,7 @@ class CarbonSource extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- CarbonEnv.init(sqlContext)
+ CarbonEnv.init(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -88,7 +88,7 @@ class CarbonSource extends CreatableRelationProvider
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
- CarbonEnv.init(sqlContext)
+ CarbonEnv.init(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -97,8 +97,10 @@ class CarbonSource extends CreatableRelationProvider
}
private def addLateDecodeOptimization(ss: SparkSession): Unit = {
- ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
- ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ if (ss.sessionState.experimentalMethods.extraStrategies.isEmpty) {
+ ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+ ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ }
}
private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
@@ -131,7 +133,7 @@ class CarbonSource extends CreatableRelationProvider
val map = scala.collection.mutable.Map[String, String]();
parameters.foreach { x => map.put(x._1, x._2) }
val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
- CreateTable(cm).run(sparkSession)
+ CreateTable(cm, false).run(sparkSession)
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 5508a94..51b79c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -31,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.{AtomicType, IntegerType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
new file mode 100644
index 0000000..5211f1a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+ if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(LoadTable(identifier.database, identifier.table, path, Seq(),
+ Map(), isOverwrite)) :: Nil
+ case DropTableCommand(identifier, ifNotExists, isView)
+ if CarbonEnv.get.carbonMetastore
+ .isTablePathExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(
+ CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
+ case ShowLoadsCommand(databaseName, table, limit) =>
+ ExecutedCommandExec(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
+ case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+ CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+ ExecutedCommandExec(createDb) :: Nil
+ case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+ if (isCascade) {
+ val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+ .filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
+ tablesInDB.foreach{tableName =>
+ CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
+ }
+ }
+ CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+ ExecutedCommandExec(drop) :: Nil
+ case alterTable@AlterTableCompaction(altertablemodel) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(TableIdentifier(altertablemodel.tableName,
+ altertablemodel.dbName))(sparkSession)
+ if (isCarbonTable) {
+ if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+ altertablemodel.compactionType.equalsIgnoreCase("major")) {
+ ExecutedCommandExec(alterTable) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table")
+ }
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+ if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+ val resolvedTable =
+ sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+ val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+ ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+ case _ => Nil
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 68ad4d6..8f97961 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,18 +17,27 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+import java.text.SimpleDateFormat
+
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
+import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.carbon.path.CarbonStorePath
@@ -36,19 +45,34 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
+object Checker {
+ def validateTableExists(
+ dbName: Option[String],
+ tableName: String,
+ session: SparkSession): Unit = {
+ val identifier = TableIdentifier(tableName, dbName)
+ if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+ val err = s"table $dbName.$tableName not found"
+ LogServiceFactory.getLogService(this.getClass.getName).error(err)
+ throw new IllegalArgumentException(err)
+ }
+ }
+}
+
/**
* Command for the compaction in alter table command
*
* @param alterTableModel
*/
-case class AlterTableCompaction(alterTableModel: AlterTableModel) {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -109,24 +133,59 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
}
}
-case class CreateTable(cm: TableModel) {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
- cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+ CarbonEnv.init(sparkSession)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
val tbName = cm.tableName
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
- val tableInfo: TableInfo = TableNewProcessor(cm, sparkSession.sqlContext)
+ val tableInfo: TableInfo = TableNewProcessor(cm)
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
}
- // Add Database to catalog and persist
- val catalog = CarbonEnv.get.carbonMetastore
- val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
- LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tbName))) {
+ if (!cm.ifNotExistsSet) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+ s"Table [$tbName] already exists under database [$dbName]")
+ sys.error(s"Table [$tbName] already exists under database [$dbName]")
+ }
+ } else {
+ // Add Database to catalog and persist
+ val catalog = CarbonEnv.get.carbonMetastore
+ // Need to fill partitioner class when we support partition
+ val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+ if (createDSTable) {
+ try {
+ sparkSession.sql(
+ s"""CREATE TABLE $dbName.$tbName
+ |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+ } catch {
+ case e: Exception =>
+ val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+ // call the drop table to delete the created table.
+
+ CarbonEnv.get.carbonMetastore
+ .dropTable(catalog.storePath, identifier)(sparkSession)
+
+ LOGGER.audit(s"Table creation with Database name [$dbName] " +
+ s"and Table name [$tbName] failed")
+ throw e
+ }
+ }
+
+ LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+ }
+
Seq.empty
}
@@ -136,6 +195,54 @@ case class CreateTable(cm: TableModel) {
}
}
+case class DeleteLoadsById(
+ loadids: Seq[String],
+ databaseNameOp: Option[String],
+ tableName: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadById(
+ loadids,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+
+ }
+
+ // validates load ids
+ private def validateLoadIds: Unit = {
+ if (loadids.isEmpty) {
+ val errorMessage = "Error: Segment id(s) should not be empty."
+ throw new MalformedCarbonCommandException(errorMessage)
+
+ }
+ }
+}
+
+case class DeleteLoadsByLoadDate(
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+ }
+
+}
object LoadTable {
@@ -205,7 +312,7 @@ case class LoadTable(
options: scala.collection.immutable.Map[String, String],
isOverwriteExist: Boolean = false,
var inputSqlString: String = null,
- dataFrame: Option[DataFrame] = None) {
+ dataFrame: Option[DataFrame] = None) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -433,3 +540,191 @@ case class LoadTable(
}
}
}
+
+private[sql] case class DeleteLoadByDate(
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String) {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+ }
+
+}
+
+case class CleanFiles(
+ databaseNameOp: Option[String],
+ tableName: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val relation = CarbonEnv.get.carbonMetastore
+ .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ CarbonStore.cleanFiles(
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ relation.asInstanceOf[CarbonRelation].tableMeta.storePath
+ )
+ Seq.empty
+ }
+}
+
+case class ShowLoads(
+ databaseNameOp: Option[String],
+ tableName: String,
+ limit: Option[String],
+ override val output: Seq[Attribute]) extends RunnableCommand {
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.showSegments(
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ limit
+ )
+ }
+}
+
+case class CarbonDropTableCommand(ifExistsSet: Boolean,
+ databaseNameOp: Option[String],
+ tableName: String)
+ extends RunnableCommand {
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
+ val identifier = TableIdentifier(tableName, Option(dbName))
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
+ var isLocked = false
+ try {
+ isLocked = carbonLock.lockWithRetries()
+ if (isLocked) {
+ logInfo("Successfully able to get the lock for drop.")
+ }
+ else {
+ LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
+ sys.error("Table is locked for deletion. Please try after some time")
+ }
+ LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+ CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+ LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+ } finally {
+ if (carbonLock != null && isLocked) {
+ if (carbonLock.unlock()) {
+ logInfo("Table MetaData Unlocked Successfully after dropping the table")
+ // deleting any remaining files.
+ val metadataFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataFilePath)
+ if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
+ }
+ // delete bad record log after drop table
+ val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
+ val badLogFileType = FileFactory.getFileType(badLogPath)
+ if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
+ val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
+ CarbonUtil.deleteFoldersAndFiles(file)
+ }
+ } else {
+ logError("Unable to unlock Table MetaData")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class DescribeCommandFormatted(
+ child: SparkPlan,
+ override val output: Seq[Attribute],
+ tblIdentifier: TableIdentifier)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val relation = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val mapper = new ObjectMapper()
+ val colProps = StringBuilder.newBuilder
+ var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+ val comment = if (relation.metaData.dims.contains(field.name)) {
+ val dimension = relation.metaData.carbonTable.getDimensionByName(
+ relation.tableMeta.carbonTableIdentifier.getTableName,
+ field.name)
+ if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+ val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+ colProps.append(field.name).append(".")
+ .append(mapper.writeValueAsString(dimension.getColumnProperties))
+ .append(",")
+ }
+ if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+ !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ "DICTIONARY, KEY COLUMN"
+ } else {
+ "KEY COLUMN"
+ }
+ } else {
+ ("MEASURE")
+ }
+ (field.name, field.dataType.simpleString, comment)
+ }
+ val colPropStr = if (colProps.toString().trim().length() > 0) {
+ // drops additional comma at end
+ colProps.toString().dropRight(1)
+ } else {
+ colProps.toString()
+ }
+ results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+ results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+ .getDatabaseName, "")
+ )
+ results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+ results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+ val carbonTable = relation.tableMeta.carbonTable
+ results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+ results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+ if (colPropStr.length() > 0) {
+ results ++= Seq((colPropStr, "", ""))
+ } else {
+ results ++= Seq(("ADAPTIVE", "", ""))
+ }
+ val dimension = carbonTable
+ .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+ results ++= getColumnGroups(dimension.asScala.toList)
+ results.map { case (name, dataType, comment) =>
+ Row(f"$name%-36s $dataType%-80s $comment%-72s")
+ }
+ }
+
+ private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+ var results: Seq[(String, String, String)] =
+ Seq(("", "", ""), ("##Column Group Information", "", ""))
+ val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+ case (groupId, _) => groupId != -1
+ }.toSeq.sortBy(_._1)
+ val groups = groupedDimensions.map(colGroups => {
+ colGroups._2.map(dim => dim.getColName).mkString(", ")
+ })
+ var index = 1
+ groups.foreach { x =>
+ results = results :+ (s"Column Group $index", x, "")
+ index = index + 1
+ }
+ results
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9638b8f..f174126 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -187,6 +187,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
tables.nonEmpty
}
+ def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tables = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ tables.nonEmpty
+ }
+
def loadMetadata(metadataPath: String, queryId: String): MetaData = {
val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
val statistic = new QueryStatistic()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..066acce
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+
+ experimentalMethods.extraStrategies =
+ Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+ experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
new file mode 100644
index 0000000..028286c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.ShowLoadsCommand
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command._
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL, DML cases in Unified context
+ */
+class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
+
+ override def parse(input: String): LogicalPlan = {
+ synchronized {
+ // Initialize the Keywords.
+ initLexical
+ phrase(start)(new lexical.Scanner(input)) match {
+ case Success(plan, _) => plan match {
+ case x: LoadTable =>
+ x.inputSqlString = input
+ x
+ case logicalPlan => logicalPlan
+ }
+ case failureOrError => sys.error(failureOrError.toString)
+ }
+ }
+ }
+
+
+ protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
+
+ protected lazy val startCommand: Parser[LogicalPlan] =
+ loadManagement| showLoads | alterTable
+
+ protected lazy val loadManagement: Parser[LogicalPlan] =
+ deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+
+
+ protected lazy val alterTable: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ {
+ case dbName ~ table ~ (compact ~ compactType) =>
+ val altertablemodel = AlterTableModel(dbName, table, compactType, null)
+ AlterTableCompaction(altertablemodel)
+ }
+
+
+ protected lazy val loadDataNew: Parser[LogicalPlan] =
+ LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
+ (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+ case filePath ~ isOverwrite ~ table ~ optionsList =>
+ val (databaseNameOp, tableName) = table match {
+ case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+ }
+ if (optionsList.isDefined) {
+ validateOptions(optionsList)
+ }
+ val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+ LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+ isOverwrite.isDefined)
+ }
+
+ protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
+ DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+ (ident <~ ".").? ~ ident) <~
+ opt(";") ^^ {
+ case loadids ~ table => table match {
+ case databaseName ~ tableName =>
+ DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
+ }
+ }
+
+ protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
+ DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+ opt(";") ^^ {
+ case schema ~ table ~ condition =>
+ condition match {
+ case dateField ~ dateValue =>
+ DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
+ }
+ }
+
+ protected lazy val cleanFiles: Parser[LogicalPlan] =
+ CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
+ case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
+ }
+
+ protected lazy val explainPlan: Parser[LogicalPlan] =
+ (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+ case isExtended ~ logicalPlan =>
+ logicalPlan match {
+ case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+ case _ => ExplainCommand(OneRowRelation)
+ }
+ }
+
+ protected lazy val showLoads: Parser[LogicalPlan] =
+ SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (LIMIT ~> numericLit).? <~
+ opt(";") ^^ {
+ case databaseName ~ tableName ~ limit =>
+ ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
+ }
+}
|