Repository: incubator-carbondata
Updated Branches:
refs/heads/master cffcb998a -> d5f409840
fix spark2 compilation
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/223cf9aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/223cf9aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/223cf9aa
Branch: refs/heads/master
Commit: 223cf9aa7f226705cf947b972f128d0c16604fc8
Parents: cffcb99
Author: jackylk <jacky.likun@huawei.com>
Authored: Fri Dec 2 22:09:33 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sat Dec 3 08:37:53 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 39 ++++++++++++--------
1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/223cf9aa/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 914cdab..05ba3ac 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,11 +32,12 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop.csv.CSVInputFormat
import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -168,9 +169,13 @@ class NewCarbonDataLoadRDD[K, V](
throw e
}
- def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+ def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index,
0)
- val configuration: Configuration = confBroadcast.value.value
+ var configuration: Configuration = confBroadcast.value.value
+ // Broadcast fails in some cases
+ if (configuration == null) {
+ configuration = new Configuration()
+ }
configureCSVInputFormat(configuration)
val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
val format = new CSVInputFormat
@@ -195,10 +200,11 @@ class NewCarbonDataLoadRDD[K, V](
partitionID, split.partitionBlocksDetail.length)
val readers =
split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.foreach { case (reader, index) =>
- reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader,
+ split.partitionBlocksDetail(index),
+ hadoopAttemptContext)
}
- readers.map(new RecordReaderIterator(_))
} else {
// for node partition
val split = theSplit.asInstanceOf[CarbonNodePartition]
@@ -220,21 +226,22 @@ class NewCarbonDataLoadRDD[K, V](
StandardLogService.setThreadName(blocksID, null)
val readers =
split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.foreach { case (reader, index) =>
- reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
}
- readers.map(new RecordReaderIterator(_))
}
}
def configureCSVInputFormat(configuration: Configuration): Unit = {
- CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
- CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
- CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
- CSVInputFormat.setHeaderExtractionEnabled(
- carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
- configuration)
- CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+ CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+ CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+ CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+ CSVInputFormat.setHeaderExtractionEnabled(configuration,
+ carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+ CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+ CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
}
/**
|