remove redundant declaration
clean up
clean up
clean up RDDFactory
fix compile
fix style
fix style
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6fee9930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6fee9930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6fee9930
Branch: refs/heads/master
Commit: 6fee9930e768d5019c3d6ee3a9a7c0a983011119
Parents: e7b46cc
Author: jackylk <jacky.likun@huawei.com>
Authored: Tue Dec 27 11:27:59 2016 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Tue Dec 27 23:40:18 2016 +0800
----------------------------------------------------------------------
.../carbondata/spark/util/CarbonQueryUtil.java | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 150 +++++------
.../spark/load/DeleteLoadFromMetadata.java | 44 ----
.../carbondata/spark/util/CarbonQueryUtil.java | 248 -------------------
.../vectorreader/ColumnarVectorWrapper.java | 2 +-
.../VectorizedCarbonRecordReader.java | 8 +-
.../spark/CarbonColumnValidator.scala | 2 +-
.../apache/carbondata/spark/CarbonFilters.scala | 34 +--
.../carbondata/spark/CarbonSparkFactory.scala | 4 +-
.../org/apache/carbondata/spark/KeyVal.scala | 89 -------
.../spark/rdd/CarbonDataRDDFactory.scala | 150 +++++------
.../spark/sql/CarbonDictionaryDecoder.scala | 16 +-
.../org/apache/spark/sql/CarbonSession.scala | 16 +-
.../org/apache/spark/sql/CarbonSource.scala | 2 +-
.../org/apache/spark/sql/TableCreator.scala | 29 ++-
.../execution/CarbonLateDecodeStrategy.scala | 12 +-
.../execution/command/carbonTableSchema.scala | 37 +--
.../spark/sql/hive/CarbonHiveMetadataUtil.scala | 3 -
.../sql/optimizer/CarbonLateDecodeRule.scala | 24 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 1 -
.../org/apache/spark/util/CleanFiles.scala | 2 +-
.../org/apache/spark/util/Compaction.scala | 2 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 2 +-
.../apache/spark/util/DeleteSegmentById.scala | 2 +-
.../org/apache/spark/util/ShowSegments.scala | 2 +-
.../org/apache/spark/util/TableLoader.scala | 4 +-
.../AllDataTypesTestCaseAggregate.scala | 19 +-
.../spark/sql/common/util/QueryTest.scala | 4 +-
28 files changed, 242 insertions(+), 668 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index d2e716f..9d1a281 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -39,7 +39,7 @@ import org.apache.commons.lang3.StringUtils;
/**
* This utilty parses the Carbon query plan to actual query model object.
*/
-public final class CarbonQueryUtil {
+public class CarbonQueryUtil {
private CarbonQueryUtil() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 93194c8..ff7bf23 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -81,10 +81,10 @@ object CarbonDataRDDFactory {
}
LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -102,10 +102,10 @@ object CarbonDataRDDFactory {
)
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ )
+ .equalsIgnoreCase("true")
// if system level compaction is enabled then only one compaction can run in the system
// if any other request comes at this time then it will create a compaction request file.
@@ -124,13 +124,13 @@ object CarbonDataRDDFactory {
} else {
// normal flow of compaction
val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -147,9 +147,9 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error(s"Not able to acquire the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
sys.error("Table is already locked for compaction. Please try after some time.")
}
}
@@ -164,12 +164,12 @@ object CarbonDataRDDFactory {
carbonTable: CarbonTable,
compactionModel: CompactionModel): Unit = {
val lock = CarbonLockFactory
- .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
- LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
- )
+ .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+ LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
- s".${ carbonLoadModel.getTableName }")
+ s".${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -190,20 +190,20 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the system level compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
- .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+ .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
// do sys error only in case of DDL trigger.
if (compactionModel.isDDLTrigger) {
sys.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
} else {
LOGGER.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
}
}
}
@@ -226,7 +226,7 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
- s" ${ e.getMessage }")
+ s" ${ e.getMessage }")
}
val compactionThread = new Thread {
@@ -250,9 +250,9 @@ object CarbonDataRDDFactory {
}
// continue in case of exception also, check for all the tables.
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ ).equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
@@ -262,8 +262,8 @@ object CarbonDataRDDFactory {
skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
val table: CarbonTable = tableForCompaction.carbonTable
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -271,12 +271,12 @@ object CarbonDataRDDFactory {
val newCarbonLoadModel = new CarbonLoadModel()
DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
+ .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
+ )
val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+ .getCompactionSize(CompactionType.MAJOR_COMPACTION)
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
@@ -294,27 +294,27 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
// delete the compaction required file in case of failure or success also.
if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+ .deleteCompactionRequiredFile(metadataPath, compactionType)) {
// if the compaction request file is not been able to delete then
// add those tables details to the skip list so that it wont be considered next.
skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
}
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
+ .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+ .tablesMeta.toArray, skipCompactionTables.asJava
+ )
}
// giving the user his error for telling in the beeline if his triggered table
// compaction is failed.
@@ -347,10 +347,10 @@ object CarbonDataRDDFactory {
// for handling of the segment Merging.
def handleSegmentMerging(tableCreationTime: Long): Unit = {
LOGGER.info(s"compaction need status is" +
- s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+ s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
@@ -370,10 +370,10 @@ object CarbonDataRDDFactory {
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ )
+ .equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
@@ -388,9 +388,9 @@ object CarbonDataRDDFactory {
)
} else {
val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock.")
@@ -411,15 +411,15 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
}
}
}
@@ -427,10 +427,10 @@ object CarbonDataRDDFactory {
try {
LOGGER.audit(s"Data load request has been received for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
if (!useKettle) {
LOGGER.audit("Data is loading with New Data Flow for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -468,16 +468,16 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER
- .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+ .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
}
// reading the start time of data load.
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
carbonLoadModel.setFactTimeStamp(loadStartTime)
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
- .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
// get partition way from configuration
// val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -528,7 +528,7 @@ object CarbonDataRDDFactory {
}
pathBuilder.append(split.getPartition.getUniqueID).append("/")
(split.getPartition.getUniqueID,
- SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+ SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
}
}
} else {
@@ -570,15 +570,15 @@ object CarbonDataRDDFactory {
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+ .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
val nodeBlockMapping =
CarbonLoaderUtil
- .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
- .toSeq
+ .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+ .toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
- s"No.of Nodes: ${nodeBlockMapping.size}")
+ s"No.of Nodes: ${nodeBlockMapping.size}")
var str = ""
nodeBlockMapping.foreach(entry => {
val tableBlock = entry._2
@@ -588,7 +588,7 @@ object CarbonDataRDDFactory {
hostentry.equalsIgnoreCase(entry._1)
)) {
str = str + " , mismatch locations: " + tableBlockInfo.getLocations
- .foldLeft("")((a, b) => a + "," + b)
+ .foldLeft("")((a, b) => a + "," + b)
}
)
str = str + "\n"
@@ -743,7 +743,7 @@ object CarbonDataRDDFactory {
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
@@ -754,10 +754,10 @@ object CarbonDataRDDFactory {
if (!status) {
val errorMessage = "Dataload failed due to failure in table status updation."
LOGGER.audit("Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
LOGGER.error("Dataload failed due to failure in table status updation.")
throw new Exception(errorMessage)
}
@@ -766,7 +766,7 @@ object CarbonDataRDDFactory {
LOGGER.info("********Database updated**********")
}
LOGGER.audit("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
deleted file mode 100644
index 0926e1c..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name : Carbon
- * Module Name : CARBON Data Processor
- * Author : R00903928
- * Created Date : 21-Sep-2015
- * FileName : DeleteLoadFromMetadata.java
- * Description : Kettle step to generate MD Key
- * Class Version : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DeleteLoadFromMetadata {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
-
- private DeleteLoadFromMetadata() {
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index 04ef665..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public final class CarbonQueryUtil {
-
- private CarbonQueryUtil() {
-
- }
-
-
- /**
- * It creates the one split for each region server.
- */
- public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
- CarbonQueryPlan queryPlan) throws IOException {
-
- //Just create splits depends on locations of region servers
- List<Partition> allPartitions = null;
- if (queryPlan == null) {
- allPartitions =
- QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
- } else {
- allPartitions =
- QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
- }
- TableSplit[] splits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < splits.length; i++) {
- splits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location =
- QueryPartitionHelper.getInstance().getLocation(partition, databaseName, tableName);
- locations.add(location);
- splits[i].setPartition(partition);
- splits[i].setLocations(locations);
- }
-
- return splits;
- }
-
- /**
- * It creates the one split for each region server.
- */
- public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
-
- //Just create splits depends on locations of region servers
- FileType fileType = FileFactory.getFileType(sourcePath);
- DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
- loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
- TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < tblSplits.length; i++) {
- tblSplits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = loadBalancer.getNodeForPartitions(partition);
- locations.add(location);
- tblSplits[i].setPartition(partition);
- tblSplits[i].setLocations(locations);
- }
- return tblSplits;
- }
-
- /**
- * It creates the one split for each region server.
- */
- public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
- int partitionCount) throws Exception {
-
- //Just create splits depends on locations of region servers
- FileType fileType = FileFactory.getFileType(sourcePath);
- DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
- loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
- TableSplit[] splits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < splits.length; i++) {
- splits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = loadBalancer.getNodeForPartitions(partition);
- locations.add(location);
- splits[i].setPartition(partition);
- splits[i].setLocations(locations);
- }
- return splits;
- }
-
- public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
- throws Exception {
-
- if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
- throw new Exception("Source file doesn't exist at path: " + sourcePath);
- }
-
- CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
- if (file.isDirectory()) {
- CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile pathname) {
- return true;
- }
- });
- for (int i = 0; i < fileNames.length; i++) {
- getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
- }
- } else {
- // add only csv files
- if (file.getName().endsWith("csv")) {
- partitionsFiles.add(file.getPath());
- }
- }
- }
-
- /**
- * split sourcePath by comma
- */
- public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
- String separator) {
- if (StringUtils.isNotEmpty(sourcePath)) {
- String[] files = sourcePath.split(separator);
- for (String file : files) {
- partitionsFiles.add(file);
- }
- }
- }
-
- private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
- List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
- List<Partition> partitionList =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
- partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
- partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
- for (int i = 0; i < files.size(); i++) {
- partitionFiles.get(i % 1).add(files.get(i));
- }
- return partitionList;
- }
-
- private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
- int partitionCount) throws Exception {
- List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
- int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
- int startIndex = 0;
- int endIndex = 0;
- List<Partition> partitionList =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- if (numberOfFilesPerPartition != null) {
- for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
- List<String> partitionFiles =
- new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- endIndex += numberOfFilesPerPartition[i];
- for (int j = startIndex; j < endIndex; j++) {
- partitionFiles.add(files.get(j));
- }
- startIndex += numberOfFilesPerPartition[i];
- partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
- }
- }
- return partitionList;
- }
-
- private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
- int div = numberOfFiles / partitionCount;
- int mod = numberOfFiles % partitionCount;
- int[] numberOfNodeToScan = null;
- if (div > 0) {
- numberOfNodeToScan = new int[partitionCount];
- Arrays.fill(numberOfNodeToScan, div);
- } else if (mod > 0) {
- numberOfNodeToScan = new int[mod];
- }
- for (int i = 0; i < mod; i++) {
- numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
- }
- return numberOfNodeToScan;
- }
-
- public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
- List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- if (null != details) {
- for (LoadMetadataDetails oneLoad : details) {
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
- String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
- slices.add(loadName);
- }
- }
- }
- return slices;
- }
-
- /**
- * This method will clear the dictionary cache for a given map of columns and dictionary cache
- * mapping
- *
- * @param columnToDictionaryMap
- */
- public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
- for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
- CarbonUtil.clearDictionaryCache(entry.getValue());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 84e5c07..5ed7389 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.Decimal;
-public class ColumnarVectorWrapper implements CarbonColumnVector {
+class ColumnarVectorWrapper implements CarbonColumnVector {
private ColumnVector columnVector;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index ba02bca..1beea97 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType;
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
* carbondata column APIs and fills the data directly into columns.
*/
-public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
private int batchIdx = 0;
@@ -166,7 +166,7 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
* before any calls to nextKeyValue/nextBatch.
*/
- public void initBatch(MemoryMode memMode) {
+ private void initBatch(MemoryMode memMode) {
List<QueryDimension> queryDimension = queryModel.getQueryDimension();
List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
@@ -232,14 +232,14 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
/*
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
- public void enableReturningBatches() {
+ private void enableReturningBatches() {
returnColumnarBatch = true;
}
/**
* Advances to the next batch of rows. Returns false if there are no more.
*/
- public boolean nextBatch() throws IOException {
+ private boolean nextBatch() {
columnarBatch.reset();
carbonColumnarBatch.reset();
if (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
index ea97bca..31bbf19 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -23,7 +23,7 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
* Carbon column validator
*/
class CarbonColumnValidator extends ColumnValidator {
- def validateColumns(allColumns: Seq[ColumnSchema]) {
+ def validateColumns(allColumns: Seq[ColumnSchema]): Unit = {
allColumns.foreach { columnSchema =>
val colWithSameId = allColumns.filter { x =>
x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 6d9fb24..0a84891 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper}
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType
@@ -151,13 +151,13 @@ object CarbonFilters {
case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
Some(sources.EqualTo(a.name, v))
- case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+ case Not(EqualTo(a: Attribute, Literal(v, t))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+ case Not(EqualTo(Literal(v, t), a: Attribute)) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+ case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+ case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
@@ -221,7 +221,7 @@ object CarbonFilters {
None
}
}
- filters.flatMap(translate(_, false)).toArray
+ filters.flatMap(translate(_)).toArray
}
def processExpression(exprs: Seq[Expression],
@@ -231,8 +231,8 @@ object CarbonFilters {
def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
expr match {
case or@ Or(left, right) =>
- val leftFilter = transformExpression(left, true)
- val rightFilter = transformExpression(right, true)
+ val leftFilter = transformExpression(left, or = true)
+ val rightFilter = transformExpression(right, or = true)
if (leftFilter.isDefined && rightFilter.isDefined) {
Some(new OrExpression(leftFilter.get, rightFilter.get))
} else {
@@ -247,22 +247,22 @@ object CarbonFilters {
(transformExpression(left) ++ transformExpression(right)).reduceOption(new
AndExpression(_, _))
- case EqualTo(a: Attribute, l@Literal(v, t)) => new
+ case EqualTo(a: Attribute, l@Literal(v, t)) =>
Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(l@Literal(v, t), a: Attribute) => new
+ case EqualTo(l@Literal(v, t), a: Attribute) =>
Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+ case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) =>
Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+ case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) =>
Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+ case Not(EqualTo(a: Attribute, l@Literal(v, t))) =>
Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+ case Not(EqualTo(l@Literal(v, t), a: Attribute)) =>
Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+ case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) =>
Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+ case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) =>
Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
case IsNotNull(child: Attribute) =>
Some(new NotEqualsExpression(transformExpression(child).get,
@@ -357,7 +357,7 @@ object CarbonFilters {
None
}
}
- exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+ exprs.flatMap(transformExpression(_)).reduceOption(new AndExpression(_, _))
}
private def isNullLiteral(exp: Expression): Boolean = {
if (null != exp
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
index 7618558..6e3a1c8 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -47,14 +47,14 @@ object CarbonSparkFactory {
/**
* @return column validator
*/
- def getCarbonColumnValidator(): ColumnValidator = {
+ def getCarbonColumnValidator: ColumnValidator = {
new CarbonColumnValidator
}
/**
* @return dictionary helper
*/
- def getDictionaryDetailService(): DictionaryDetailService = {
+ def getDictionaryDetailService: DictionaryDetailService = {
new DictionaryDetailHelper
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
deleted file mode 100644
index 254052b..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * It is just Key value class. I don't get any other alternate to make the RDD class to
- * work with my minimum knowledge in scala.
- * May be I will remove later once I gain good knowledge :)
- *
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.load.LoadMetadataDetails
-
-trait Value[V] extends Serializable {
- def getValue(value: Array[Object]): V
-}
-
-class ValueImpl extends Value[Array[Object]] {
- override def getValue(value: Array[Object]): Array[Object] = value
-}
-
-trait RawValue[V] extends Serializable {
- def getValue(value: Array[Any]): V
-}
-
-class RawValueImpl extends RawValue[Array[Any]] {
- override def getValue(value: Array[Any]): Array[Any] = value
-}
-
-trait DataLoadResult[K, V] extends Serializable {
- def getKey(key: String, value: LoadMetadataDetails): (K, V)
-}
-
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
- override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
- (key, value)
- }
-}
-
-
-trait PartitionResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class PartitionResultImpl extends PartitionResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait MergeResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class MergeResultImpl extends MergeResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait DeletedLoadResult[K, V] extends Serializable {
- def getKey(key: String, value: String): (K, V)
-}
-
-class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
- override def getKey(key: String, value: String): (String, String) = (key, value)
-}
-
-trait RestructureResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-}
-
-class RestructureResultImpl extends RestructureResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index de07707..f451a54 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -79,10 +79,10 @@ object CarbonDataRDDFactory {
}
LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -100,10 +100,10 @@ object CarbonDataRDDFactory {
)
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ )
+ .equalsIgnoreCase("true")
// if system level compaction is enabled then only one compaction can run in the system
// if any other request comes at this time then it will create a compaction request file.
@@ -122,13 +122,13 @@ object CarbonDataRDDFactory {
} else {
// normal flow of compaction
val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -145,9 +145,9 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error(s"Not able to acquire the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
sys.error("Table is already locked for compaction. Please try after some time.")
}
}
@@ -162,12 +162,12 @@ object CarbonDataRDDFactory {
carbonTable: CarbonTable,
compactionModel: CompactionModel): Unit = {
val lock = CarbonLockFactory
- .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
- LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
- )
+ .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+ LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
- s".${ carbonLoadModel.getTableName }")
+ s".${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -188,20 +188,20 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the system level compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
- .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+ .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
// do sys error only in case of DDL trigger.
if (compactionModel.isDDLTrigger) {
sys.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
} else {
LOGGER.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
}
}
}
@@ -224,7 +224,7 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
- s" ${ e.getMessage }")
+ s" ${ e.getMessage }")
}
val compactionThread = new Thread {
@@ -248,9 +248,9 @@ object CarbonDataRDDFactory {
}
// continue in case of exception also, check for all the tables.
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ ).equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
@@ -260,8 +260,8 @@ object CarbonDataRDDFactory {
skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
val table: CarbonTable = tableForCompaction.carbonTable
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -269,12 +269,12 @@ object CarbonDataRDDFactory {
val newCarbonLoadModel = new CarbonLoadModel()
DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
+ .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
+ )
val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+ .getCompactionSize(CompactionType.MAJOR_COMPACTION)
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
@@ -292,27 +292,27 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
// delete the compaction required file in case of failure or success also.
if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+ .deleteCompactionRequiredFile(metadataPath, compactionType)) {
// if the compaction request file is not been able to delete then
// add those tables details to the skip list so that it wont be considered next.
skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
- s"${ tableForCompaction.carbonTable.getDatabaseName }." +
- s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
}
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
+ .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+ .tablesMeta.toArray, skipCompactionTables.asJava
+ )
}
// giving the user his error for telling in the beeline if his triggered table
// compaction is failed.
@@ -345,10 +345,10 @@ object CarbonDataRDDFactory {
// for handling of the segment Merging.
def handleSegmentMerging(tableCreationTime: Long): Unit = {
LOGGER.info(s"compaction need status is" +
- s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+ s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
@@ -368,10 +368,10 @@ object CarbonDataRDDFactory {
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ )
+ .equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
@@ -386,9 +386,9 @@ object CarbonDataRDDFactory {
)
} else {
val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.COMPACTION_LOCK
+ )
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock.")
@@ -409,15 +409,15 @@ object CarbonDataRDDFactory {
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
}
}
}
@@ -425,10 +425,10 @@ object CarbonDataRDDFactory {
try {
LOGGER.audit(s"Data load request has been received for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
if (!useKettle) {
LOGGER.audit("Data is loading with New Data Flow for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -466,16 +466,16 @@ object CarbonDataRDDFactory {
} catch {
case e: Exception =>
LOGGER
- .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+ .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
}
// reading the start time of data load.
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
carbonLoadModel.setFactTimeStamp(loadStartTime)
val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
- .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
// get partition way from configuration
// val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -526,7 +526,7 @@ object CarbonDataRDDFactory {
}
pathBuilder.append(split.getPartition.getUniqueID).append("/")
(split.getPartition.getUniqueID,
- SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+ SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
}
}
} else {
@@ -568,15 +568,15 @@ object CarbonDataRDDFactory {
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+ .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
val nodeBlockMapping =
CarbonLoaderUtil
- .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
- .toSeq
+ .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+ .toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
- s"No.of Nodes: ${nodeBlockMapping.size}")
+ s"No.of Nodes: ${nodeBlockMapping.size}")
var str = ""
nodeBlockMapping.foreach(entry => {
val tableBlock = entry._2
@@ -586,7 +586,7 @@ object CarbonDataRDDFactory {
hostentry.equalsIgnoreCase(entry._1)
)) {
str = str + " , mismatch locations: " + tableBlockInfo.getLocations
- .foldLeft("")((a, b) => a + "," + b)
+ .foldLeft("")((a, b) => a + "," + b)
}
)
str = str + "\n"
@@ -723,7 +723,7 @@ object CarbonDataRDDFactory {
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
@@ -734,10 +734,10 @@ object CarbonDataRDDFactory {
if (!status) {
val errorMessage = "Dataload failed due to failure in table status updation."
LOGGER.audit("Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
LOGGER.error("Dataload failed due to failure in table status updation.")
throw new Exception(errorMessage)
}
@@ -746,7 +746,7 @@ object CarbonDataRDDFactory {
LOGGER.info("********Database updated**********")
}
LOGGER.audit("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ce5962d..8deacc0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -159,10 +159,10 @@ case class CarbonDictionaryDecoder(
// add a task completion listener to clear dictionary that is a decisive factor for
// LRU eviction policy
val dictionaryTaskCleaner = TaskContext.get
- dictionaryTaskCleaner.addTaskCompletionListener(context =>
+ dictionaryTaskCleaner.addTaskCompletionListener(_ =>
dicts.foreach { dictionary =>
if (null != dictionary) {
- dictionary.clear
+ dictionary.clear()
}
}
)
@@ -312,10 +312,10 @@ class CarbonDecoderRDD(
// add a task completion listener to clear dictionary that is a decisive factor for
// LRU eviction policy
val dictionaryTaskCleaner = TaskContext.get
- dictionaryTaskCleaner.addTaskCompletionListener(context =>
+ dictionaryTaskCleaner.addTaskCompletionListener(_ =>
dicts.foreach { dictionary =>
if (null != dictionary) {
- dictionary.clear
+ dictionary.clear()
}
}
)
@@ -327,7 +327,6 @@ class CarbonDecoderRDD(
override final def hasNext: Boolean = iter.hasNext
override final def next(): InternalRow = {
- val startTime = System.currentTimeMillis()
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
dictIndex.foreach { index =>
@@ -342,13 +341,6 @@ class CarbonDecoderRDD(
}
}
- private def isRequiredToDecode = {
- getDictionaryColumnIds.find(p => p._1 != null) match {
- case Some(value) => true
- case _ => false
- }
- }
-
private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 748d292..67ee478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -50,10 +50,10 @@ object CarbonSession {
// Get the session from current thread's active session.
var session: SparkSession = SparkSession.getActiveSession match {
- case Some(session) =>
- if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.conf.set(k, v) }
- session
+ case Some(sparkSession) =>
+ if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+ options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+ sparkSession
} else {
null
}
@@ -67,10 +67,10 @@ object CarbonSession {
SparkSession.synchronized {
// If the current thread does not have an active session, get it from the global session.
session = SparkSession.getDefaultSession match {
- case Some(session) =>
- if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.conf.set(k, v) }
- session
+ case Some(sparkSession) =>
+ if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+ options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+ sparkSession
} else {
null
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index c78ddf3..8a946c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -130,7 +130,7 @@ class CarbonSource extends CreatableRelationProvider
}
f
}
- val map = scala.collection.mutable.Map[String, String]();
+ val map = scala.collection.mutable.Map[String, String]()
parameters.foreach { x => map.put(x._1, x._2) }
val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
CreateTable(cm, false).run(sparkSession)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 1faaafa..362c951 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -62,9 +62,9 @@ object TableCreator {
// All excluded cols should be there in create table cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
dictExcludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
dictExcludeCols
- .map { dictExcludeCol =>
+ .foreach { dictExcludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
" does not exist in table. Please check create table statement."
@@ -87,8 +87,8 @@ object TableCreator {
// All included cols should be there in create table cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
dictIncludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
- dictIncludeCols.map { distIncludeCol =>
+ tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
+ dictIncludeCols.foreach { distIncludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
" does not exist in table. Please check create table statement."
@@ -117,9 +117,9 @@ object TableCreator {
}
dimFields += field
} else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
- dimFields += (field)
+ dimFields += field
} else if (isDetectAsDimentionDatatype(field.dataType.get)) {
- dimFields += (field)
+ dimFields += field
}
}
)
@@ -143,13 +143,13 @@ object TableCreator {
// get all included cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
dictIncludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+ tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
}
// get all excluded cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
dictExcludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
}
// by default consider all non string cols as msrs. consider all include/ exclude cols as dims
@@ -264,7 +264,7 @@ object TableCreator {
if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
var splittedColGrps: Seq[String] = Seq[String]()
- val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+ val nonSplitCols: String = tableProperties(CarbonCommonConstants.COLUMN_GROUPS)
// row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
// here first splitting the value by () . so that the above will be splitted into 2 strings.
@@ -313,9 +313,8 @@ object TableCreator {
if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
noInvertedIdxColsProps =
- tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
- noInvertedIdxColsProps
- .map { noInvertedIdxColProp =>
+ tableProperties("NO_INVERTED_INDEX").split(',').map(_.trim)
+ noInvertedIdxColsProps.foreach { noInvertedIdxColProp =>
if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
" does not exist in table. Please check create table statement."
@@ -357,11 +356,11 @@ object TableCreator {
field.storeType
)
case "array" => Field(field.column, Some("Array"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
+ field.children.map(f => f.map(normalizeType)),
field.parent, field.storeType
)
case "struct" => Field(field.column, Some("Struct"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
+ field.children.map(f => f.map(normalizeType)),
field.parent, field.storeType
)
case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
@@ -372,7 +371,7 @@ object TableCreator {
// checking if the nested data type contains the child type as decimal(10,0),
// if it is present then extracting the precision and scale. resetting the data type
// with Decimal.
- case _ if (dataType.startsWith("decimal")) =>
+ case _ if dataType.startsWith("decimal") =>
val (precision, scale) = getScaleAndPrecision(dataType)
Field(field.column,
Some("Decimal"),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 51b79c5..fe8bbe7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -92,7 +92,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
rdd: RDD[InternalRow],
needDecode: ArrayBuffer[AttributeReference]):
RDD[InternalRow] = {
- if (needDecode.size > 0) {
+ if (needDecode.nonEmpty) {
rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
getDecoderRDD(relation, needDecode, rdd, output)
} else {
@@ -249,7 +249,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
needDecoder: ArrayBuffer[AttributeReference],
updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
- needDecoder.length == 0) {
+ needDecoder.isEmpty) {
BatchedDataSourceScanExec(
output,
scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
@@ -362,13 +362,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
Some(sources.EqualTo(a.name, v))
- case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+ case Not(EqualTo(a: Attribute, Literal(v, t))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+ case Not(EqualTo(Literal(v, t), a: Attribute)) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+ case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+ case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8f97961..86bd92b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -168,7 +168,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
s"""CREATE TABLE $dbName.$tbName
|(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
|USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -214,14 +214,6 @@ case class DeleteLoadsById(
}
- // validates load ids
- private def validateLoadIds: Unit = {
- if (loadids.isEmpty) {
- val errorMessage = "Error: Segment id(s) should not be empty."
- throw new MalformedCarbonCommandException(errorMessage)
-
- }
- }
}
case class DeleteLoadsByLoadDate(
@@ -293,8 +285,8 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo
relation.carbonRelation.tableName,
null,
Seq(),
- scala.collection.immutable.Map(("fileheader" -> header)),
- false,
+ scala.collection.immutable.Map("fileheader" -> header),
+ isOverwriteExist = false,
null,
Some(df)).run(sparkSession)
// updating relation metadata. This is in case of auto detect high cardinality
@@ -326,7 +318,6 @@ case class LoadTable(
}
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val identifier = TableIdentifier(tableName, Option(dbName))
if (isOverwriteExist) {
sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
}
@@ -378,9 +369,9 @@ case class LoadTable(
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- var partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+ val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+ relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+ relation.tableMeta.carbonTableIdentifier.getTableName + "/"
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -412,15 +403,6 @@ case class LoadTable(
val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
val dateFormat = options.getOrElse("dateformat", null)
validateDateFormat(dateFormat, table)
- val multiLine = options.getOrElse("multiline", "false").trim.toLowerCase match {
- case "true" => true
- case "false" => false
- case illegal =>
- val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
- "load DDL which you set can only be 'true' or 'false', please check " +
- "your input DDL."
- throw new MalformedCarbonCommandException(errorMessage)
- }
val maxColumns = options.getOrElse("maxcolumns", null)
carbonLoadModel.setMaxColumns(maxColumns)
carbonLoadModel.setEscapeChar(escapeChar)
@@ -451,7 +433,7 @@ case class LoadTable(
// set local dictionary path, and dictionary file extension
carbonLoadModel.setAllDictPath(allDictionaryPath)
- var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+ val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
try {
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -521,7 +503,7 @@ case class LoadTable(
throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
"string.")
} else {
- var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
+ val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
for (singleDateFormat <- dateFormats) {
val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
val columnName = dateFormatSplits(0).trim.toLowerCase
@@ -667,7 +649,6 @@ private[sql] case class DescribeCommandFormatted(
relation.tableMeta.carbonTableIdentifier.getTableName,
field.name)
if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
- val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
colProps.append(field.name).append(".")
.append(mapper.writeValueAsString(dimension.getColumnProperties))
.append(",")
@@ -679,7 +660,7 @@ private[sql] case class DescribeCommandFormatted(
"KEY COLUMN"
}
} else {
- ("MEASURE")
+ "MEASURE"
}
(field.name, field.dataType.simpleString, comment)
}
|