carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: readfileheader
Date Wed, 11 Jan 2017 09:47:46 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b1fe03eeb -> 30033605b


readfileheader

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/cbc33c4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/cbc33c4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/cbc33c4b

Branch: refs/heads/master
Commit: cbc33c4b3893f7e1d8d820f89c0dccccf6d3ef5a
Parents: b1fe03e
Author: QiangCai <qiangcai@qq.com>
Authored: Tue Jan 10 21:32:51 2017 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed Jan 11 15:57:41 2017 +0800

----------------------------------------------------------------------
 .../TestLoadDataWithFileHeaderException.scala   |   6 +-
 .../TestLoadDataWithNotProperInputFile.scala    |  19 ++--
 .../carbondata/spark/util/CommonUtil.scala      |  46 +++++++-
 .../spark/util/GlobalDictionaryUtil.scala       |  64 +----------
 .../execution/command/carbonTableSchema.scala   |   3 +-
 .../spark/util/AllDictionaryTestCase.scala      |   1 +
 .../AutoHighCardinalityIdentifyTestCase.scala   |   1 +
 .../util/ExternalColumnDictionaryTestCase.scala |   5 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |   1 +
 .../util/GlobalDictionaryUtilTestCase.scala     |   1 +
 .../execution/command/carbonTableSchema.scala   |   3 +-
 .../processing/model/CarbonLoadModel.java       |  10 ++
 .../newflow/DataLoadProcessBuilder.java         |  40 +------
 .../util/CarbonDataProcessorUtil.java           | 109 ++++---------------
 14 files changed, 99 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
index 7717112..78b9a22 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -41,8 +41,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
       assert(false)
     } catch {
       case e: Exception =>
-        assert(e.getMessage.equals("DataLoad failure: CSV File provided is not proper. "
+
-          "Column names in schema and csv header are not same. CSVFile Name : windows.csv"))
+        assert(e.getMessage.contains("CSV header in input file is not proper. Column names
in schema and csv header are not the same."))
     }
   }
 
@@ -55,8 +54,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
       assert(false)
     } catch {
       case e: Exception =>
-        assert(e.getMessage.equals("DataLoad failure: CSV header provided in DDL is not proper.
" +
-          "Column names in schema and CSV header are not the same."))
+        assert(e.getMessage.contains("CSV header in DDL is not proper. Column names in schema
and CSV header are not the same"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index 5fd52dc..7fb194f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -35,10 +35,9 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
 
   test("test loading data with input path exists but has nothing") {
     try {
-      val carbonLoadModel: CarbonLoadModel = new CarbonLoadModel
       val dataPath = s"$resourcesPath/nullSample.csv"
-      carbonLoadModel.setFactFilePath(FileUtils.getPaths(dataPath))
-      GlobalDictionaryUtil.loadDataFrame(sqlContext, carbonLoadModel)
+      FileUtils.getPaths(dataPath)
+      assert(false)
     } catch {
       case e: Throwable =>
         assert(e.getMessage.contains("Please check your input path and make sure " +
@@ -48,24 +47,20 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
 
   test("test loading data with input file not ends with '.csv'") {
     try {
-      val carbonLoadModel: CarbonLoadModel = new CarbonLoadModel
       val dataPath = s"$resourcesPath/noneCsvFormat.cs"
-      carbonLoadModel.setFactFilePath(FileUtils.getPaths(dataPath))
-      GlobalDictionaryUtil.loadDataFrame(sqlContext, carbonLoadModel)
+      FileUtils.getPaths(dataPath)
+      assert(true)
     } catch {
       case e: Throwable =>
-        e.printStackTrace()
-        assert(e.getMessage.contains("Please check your input path and make sure " +
-          "that files end with '.csv' and content is not empty"))
+        assert(false)
     }
   }
 
   test("test loading data with input file does not exist") {
     try {
-      val carbonLoadModel: CarbonLoadModel = new CarbonLoadModel
       val dataPath = s"$resourcesPath/input_file_does_not_exist.csv"
-      carbonLoadModel.setFactFilePath(FileUtils.getPaths(dataPath))
-      GlobalDictionaryUtil.loadDataFrame(sqlContext, carbonLoadModel)
+      FileUtils.getPaths(dataPath)
+      assert(false)
     } catch {
       case e: Throwable =>
         assert(e.getMessage.contains("The input file does not exist"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 410e4bd..4951acf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -22,6 +22,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.SparkContext
@@ -31,9 +32,11 @@ import org.apache.spark.util.FileUtils
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CommonUtil {
@@ -301,4 +304,45 @@ object CommonUtil {
       LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString
}")
     }
   }
+
+  def getCsvHeaderColumns(carbonLoadModel: CarbonLoadModel): Array[String] = {
+    val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+      CarbonCommonConstants.COMMA
+    } else {
+      CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter)
+    }
+    var csvFile: String = null
+    var csvHeader: String = carbonLoadModel.getCsvHeader
+    val csvColumns = if (StringUtils.isBlank(csvHeader)) {
+      // read header from csv file
+      csvFile = carbonLoadModel.getFactFilePath.split(",")(0)
+      csvHeader = CarbonUtil.readHeader(csvFile)
+      if (StringUtils.isBlank(csvHeader)) {
+        throw new CarbonDataLoadingException("First line of the csv is not valid.")
+      }
+      csvHeader.toLowerCase().split(delimiter).map(_.replaceAll("\"", "").trim)
+    } else {
+      csvHeader.toLowerCase.split(CarbonCommonConstants.COMMA).map(_.trim)
+    }
+
+    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns,
+        carbonLoadModel.getCarbonDataLoadSchema)) {
+      if (csvFile == null) {
+        LOGGER.error("CSV header in DDL is not proper."
+                     + " Column names in schema and CSV header are not the same.")
+        throw new CarbonDataLoadingException(
+          "CSV header in DDL is not proper. Column names in schema and CSV header are "
+          + "not the same.")
+      } else {
+        LOGGER.error(
+          "CSV header in input file is not proper. Column names in schema and csv header
are not "
+          + "the same. Input file : " + csvFile)
+        throw new CarbonDataLoadingException(
+          "CSV header in input file is not proper. Column names in schema and csv header
are not "
+          + "the same. Input file : " + csvFile)
+      }
+    }
+
+    csvColumns
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 2c8340c..25302c1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -367,32 +367,7 @@ object GlobalDictionaryUtil {
       val hadoopConfiguration = new Configuration()
       CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
       hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
-      val header = if (StringUtils.isBlank(carbonLoadModel.getCsvHeader)) {
-        val fileHeader = CarbonUtil.readHeader(carbonLoadModel.getFactFilePath.split(",")(0))
-        if (StringUtils.isBlank(fileHeader)) {
-          throw new CarbonDataLoadingException("First line of the csv is not valid.");
-        }
-        fileHeader
-      } else {
-        carbonLoadModel.getCsvHeader
-      }
-      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-        CarbonCommonConstants.COMMA
-      } else {
-        carbonLoadModel.getCsvDelimiter
-      }
-      val quote = if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
-        "\""
-      } else {
-        carbonLoadModel.getQuoteChar
-      }
-      val columnNames = header.split(delimiter).map { column =>
-        if ( column.startsWith(quote) && column.endsWith(quote)) {
-          column.substring(1, column.length - 1)
-        } else {
-          column
-        }
-      }
+      val columnNames = carbonLoadModel.getCsvHeaderColumns
       val schema = StructType(columnNames.map[StructField, Array[StructField]]{ column =>
         StructField(column, StringType)
       })
@@ -704,31 +679,6 @@ object GlobalDictionaryUtil {
   }
 
   /**
-   * get file headers from fact file
-   *
-   * @param carbonLoadModel
-   * @return headers
-   */
-  private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
-    var headers: Array[String] = null
-    val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
-    val readLine = CarbonUtil.readHeader(factFile)
-
-    if (null != readLine) {
-      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-        "" + DEFAULT_SEPARATOR
-      } else {
-        carbonLoadModel.getCsvDelimiter
-      }
-      headers = readLine.toLowerCase().split(delimiter)
-    } else {
-      LOGGER.error("Not found file header! Please set fileheader")
-      throw new IOException("Failed to get file header")
-    }
-    headers
-  }
-
-  /**
    * generate global dictionary with SQLContext and CarbonLoadModel
    *
    * @param sqlContext      sql context
@@ -755,11 +705,7 @@ object GlobalDictionaryUtil {
         LOGGER.info("Generate global dictionary from source data files!")
         // load data by using dataSource com.databricks.spark.csv
         var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
-        var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-          df.columns
-        } else {
-          carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
-        }
+        var headers = carbonLoadModel.getCsvHeaderColumns
         headers = headers.map(headerName => headerName.trim)
         val colDictFilePath = carbonLoadModel.getColDictFilePath
         if (colDictFilePath != null) {
@@ -796,11 +742,7 @@ object GlobalDictionaryUtil {
         LOGGER.info("Generate global dictionary from dictionary files!")
         val isNonempty = validateAllDictionaryPath(allDictionaryPath)
         if (isNonempty) {
-          var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-            getHeaderFormFactFile(carbonLoadModel)
-          } else {
-            carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
-          }
+          var headers = carbonLoadModel.getCsvHeaderColumns
           headers = headers.map(headerName => headerName.trim)
           // prune columns according to the CSV file header, dimension columns
           val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers,
headers)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a26e0e1..0f70aae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -57,7 +57,7 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
 
 object Checker {
   def validateTableExists(
@@ -491,6 +491,7 @@ case class LoadTable(
         carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
+        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
 
         if (carbonLoadModel.getUseOnePass) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 2b1b615..e96d8a2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -64,6 +64,7 @@ class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
index 49585ee..69b6e66 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
@@ -60,6 +60,7 @@ class AutoHighCardinalityIdentifyTestCase extends QueryTest with BeforeAndAfterA
     carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index e029572..eece9c9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -62,7 +62,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
     extColDictFilePath3 = s"channelsId:${resourcesPath}/channelsId.csv"
     header = "deviceInformationId,channelsId,ROMSize,purchasedate,mobile,MAC," +
       "locationinfo,proddate,gamePointId,contractNumber"
-    header2 = "deviceInformationId|channelsId|contractNumber"
+    header2 = "deviceInformationId,channelsId,contractNumber"
   }
 
   def buildTable() = {
@@ -145,6 +145,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
     carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel
   }
 
@@ -230,7 +231,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
     try {
       sql(s"""
       LOAD DATA LOCAL INPATH "$complexFilePath1" INTO TABLE loadSqlTest
-      OPTIONS('COLUMNDICT'='gamePointId:$filePath')
+      OPTIONS('FILEHEADER'='$header', 'COLUMNDICT'='gamePointId:$filePath')
       """)
       assert(false)
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 5a5547d..7fff8d3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -63,6 +63,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft
     carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index 458764f..9f58180 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -69,6 +69,7 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll
{
     carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index e943d61..0818307 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,7 +54,7 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
 
 object Checker {
   def validateTableExists(
@@ -454,6 +454,7 @@ case class LoadTable(
         carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
+        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
         GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
 
         val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 2d66038..9071dbe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -64,6 +64,7 @@ public class CarbonLoadModel implements Serializable {
 
   private List<String> factFilesToProcess;
   private String csvHeader;
+  private String[] csvHeaderColumns;
   private String csvDelimiter;
   private String complexDelimiterLevel1;
   private String complexDelimiterLevel2;
@@ -250,6 +251,14 @@ public class CarbonLoadModel implements Serializable {
     this.csvHeader = csvHeader;
   }
 
+  public String[] getCsvHeaderColumns() {
+    return csvHeaderColumns;
+  }
+
+  public void setCsvHeaderColumns(String[] csvHeaderColumns) {
+    this.csvHeaderColumns = csvHeaderColumns;
+  }
+
   public void initPredefDictMap() {
     predefDictMap = new HashMap<>();
   }
@@ -398,6 +407,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.isRetentionRequest = isRetentionRequest;
     copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
     copyObj.csvHeader = header;
+    copyObj.csvHeaderColumns = csvHeaderColumns;
     copyObj.factFilesToProcess = filesForPartition;
     copyObj.isDirectLoad = true;
     copyObj.csvDelimiter = delimiter;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index f87b4bb..2158219 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -31,11 +31,9 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
@@ -43,8 +41,6 @@ import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-import org.apache.commons.lang3.StringUtils;
-
 /**
  * It builds the pipe line of steps for loading data to carbon.
  */
@@ -117,41 +113,7 @@ public final class DataLoadProcessBuilder {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
-    String csvHeader = loadModel.getCsvHeader();
-    String csvFileName = null;
-    if (csvHeader != null && !csvHeader.isEmpty()) {
-      configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, ","));
-    } else {
-      CarbonFile csvFile =
-          CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
-      csvFileName = csvFile.getName();
-      csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
-      // if firstRow = " ", then throw exception
-      if (StringUtils.isNotEmpty(csvHeader) && StringUtils.isBlank(csvHeader)) {
-        throw new CarbonDataLoadingException("First line of the csv is not valid.");
-      }
-      configuration.setHeader(
-          CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter()));
-    }
-    if (!CarbonDataProcessorUtil
-        .isHeaderValid(loadModel.getTableName(), csvHeader, loadModel.getCarbonDataLoadSchema(),
-            loadModel.getCsvDelimiter())) {
-      if (csvFileName == null) {
-        LOGGER.error("CSV header provided in DDL is not proper."
-            + " Column names in schema and CSV header are not the same.");
-        throw new CarbonDataLoadingException(
-            "CSV header provided in DDL is not proper. Column names in schema and CSV header
are "
-                + "not the same.");
-      } else {
-        LOGGER.error(
-            "CSV File provided is not proper. Column names in schema and csv header are not
same. "
-                + "CSVFile Name : " + csvFileName);
-        throw new CarbonDataLoadingException(
-            "CSV File provided is not proper. Column names in schema and csv header are not
same. "
-                + "CSVFile Name : " + csvFileName);
-      }
-    }
-
+    configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setPartitionId(loadModel.getPartitionId());
     configuration.setSegmentId(loadModel.getSegmentId());
     configuration.setTaskNo(loadModel.getTaskNo());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbc33c4b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index cc142e7..5956792 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -19,19 +19,16 @@
 
 package org.apache.carbondata.processing.util;
 
-import java.io.BufferedReader;
-import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +56,6 @@ import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
-import org.apache.carbondata.processing.etl.DataLoadingException;
 import org.apache.carbondata.processing.newflow.DataField;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -373,93 +369,28 @@ public final class CarbonDataProcessorUtil {
     return complexTypesMap;
   }
 
-  /**
-   * Get the csv file to read if it the path is file otherwise get the first file of directory.
-   *
-   * @param csvFilePath
-   * @return File
-   */
-  public static CarbonFile getCsvFileToRead(String csvFilePath) {
-    CarbonFile csvFile =
-        FileFactory.getCarbonFile(csvFilePath, FileFactory.getFileType(csvFilePath));
-
-    CarbonFile[] listFiles = null;
-    if (csvFile.isDirectory()) {
-      listFiles = csvFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile pathname) {
-          if (!pathname.isDirectory()) {
-            if (pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) ||
pathname
-                .getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
-                    + CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
-              return true;
-            }
-          }
-          return false;
-        }
-      });
-    } else {
-      listFiles = new CarbonFile[1];
-      listFiles[0] = csvFile;
-    }
-    return listFiles[0];
-  }
-
-  /**
-   * Get the file header from csv file.
-   */
-  public static String getFileHeader(CarbonFile csvFile)
-      throws DataLoadingException {
-    DataInputStream fileReader = null;
-    BufferedReader bufferedReader = null;
-    String readLine = null;
-
-    FileType fileType = FileFactory.getFileType(csvFile.getAbsolutePath());
-
-    if (!csvFile.exists()) {
-      csvFile = FileFactory
-          .getCarbonFile(csvFile.getAbsolutePath() + CarbonCommonConstants.FILE_INPROGRESS_STATUS,
-              fileType);
-    }
-
-    try {
-      fileReader = FileFactory.getDataInputStream(csvFile.getAbsolutePath(), fileType);
-      bufferedReader =
-          new BufferedReader(new InputStreamReader(fileReader, Charset.defaultCharset()));
-      readLine = bufferedReader.readLine();
-    } catch (FileNotFoundException e) {
-      LOGGER.error(e, "CSV Input File not found  " + e.getMessage());
-      throw new DataLoadingException("CSV Input File not found ", e);
-    } catch (IOException e) {
-      LOGGER.error(e, "Not able to read CSV input File  " + e.getMessage());
-      throw new DataLoadingException("Not able to read CSV input File ", e);
-    } finally {
-      CarbonUtil.closeStreams(fileReader, bufferedReader);
+  public static boolean isHeaderValid(String tableName, String[] csvHeader,
+      CarbonDataLoadSchema schema) {
+    Iterator<String> columnIterator =
+        CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).iterator();
+    Set<String> csvColumns = new HashSet<String>(csvHeader.length);
+    Collections.addAll(csvColumns, csvHeader);
+
+    // file header should contain all columns of carbon table.
+    // So csvColumns should contain all elements of columnIterator.
+    while (columnIterator.hasNext()) {
+      if (!csvColumns.contains(columnIterator.next().toLowerCase())) {
+        return false;
+      }
     }
-
-    return readLine;
+    return true;
   }
 
   public static boolean isHeaderValid(String tableName, String header,
-      CarbonDataLoadSchema schema, String delimiter) throws DataLoadingException {
-    delimiter = CarbonUtil.delimiterConverter(delimiter);
-    String[] columnNames =
-        CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).toArray(new String[0]);
-    String[] csvHeader = header.toLowerCase().split(delimiter);
-
-    List<String> csvColumnsList = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    for (String column : csvHeader) {
-      csvColumnsList.add(column.replaceAll("\"", "").trim());
-    }
-
-    int count = 0;
-
-    for (String columns : columnNames) {
-      if (csvColumnsList.contains(columns.toLowerCase())) {
-        count++;
-      }
-    }
-    return count == columnNames.length;
+      CarbonDataLoadSchema schema, String delimiter) {
+    String convertedDelimiter = CarbonUtil.delimiterConverter(delimiter);
+    String[] csvHeader = getColumnFields(header.toLowerCase(), convertedDelimiter);
+    return isHeaderValid(tableName, csvHeader, schema);
   }
 
   /**


Mime
View raw message