carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [40/50] [abbrv] carbondata git commit: [CARBONDATA-1249] Wrong order of columns in redirected csv for bad records
Date Sun, 07 Jan 2018 03:05:48 GMT
[CARBONDATA-1249] Wrong order of columns in redirected csv for bad records

Problem:
Wrong order of columns in redirected csv for bad records
The RowParser rearrage the csv raw data based on the inputMapping & outputMapping.
So the converter step does not have actual raw csv record to log or redirect the bad record
details.

Steps to repprodcue:

Create employee(Name string, age int, project string) stored by 'carbondata'

LOAD DATA LOCAL INPATH '' INTO table employee options('BAD_RECORDS_ACTION'='REDIRECT')
Data:

Name,age,Project
Sam,27,Carbon
Ruhi,23x,Hadoop

The second record is bad record so it will be writtern to the csv file at the bad record loation.

Expected:

Ruhi,23x,Hadoop

This closes #1116


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e40b34b0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e40b34b0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e40b34b0

Branch: refs/heads/carbonstore
Commit: e40b34b08210981cbe3bffe73cacff2577fbd8cb
Parents: 1f54c47
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Wed Jun 28 19:22:45 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jan 4 20:45:40 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/row/CarbonRow.java           | 24 +++++--
 .../badrecordloger/BadRecordLoggerTest.scala    | 68 +++++++++++++++++++-
 .../TestDataLoadWithColumnsMoreThanSchema.scala |  4 ++
 .../load/DataLoadProcessorStepOnSpark.scala     | 14 +++-
 .../spark/sql/test/TestQueryExecutor.scala      |  8 ++-
 .../converter/impl/RowConverterImpl.java        |  6 +-
 .../loading/model/CarbonLoadModel.java          |  1 +
 .../loading/steps/InputProcessorStepImpl.java   | 26 ++++++--
 .../util/CarbonDataProcessorUtil.java           | 28 ++++++++
 9 files changed, 159 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index d981fa4..8702421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -27,12 +27,24 @@ public class CarbonRow implements Serializable {
 
   private Object[] data;
 
+  private Object[] rawData;
+
   public short bucketNumber;
 
   public CarbonRow(Object[] data) {
     this.data = data;
   }
 
+  /**
+   *
+   * @param data contains column values for only schema columns
+   * @param rawData contains complete row of the rawData
+   */
+  public CarbonRow(Object[] data, Object[] rawData) {
+    this.data = data;
+    this.rawData = rawData;
+  }
+
   public Object[] getData() {
     return data;
   }
@@ -61,14 +73,14 @@ public class CarbonRow implements Serializable {
     data[ordinal] = value;
   }
 
-  public CarbonRow getCopy() {
-    Object[] copy = new Object[data.length];
-    System.arraycopy(data, 0, copy, 0, copy.length);
-    return new CarbonRow(copy);
-  }
-
   @Override public String toString() {
     return Arrays.toString(data);
   }
 
+  public Object[] getRawData() {
+    return rawData;
+  }
+  public void setRawData(Object[] rawData) {
+    this.rawData = rawData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 3c7f5e2..463ddbf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -17,15 +17,19 @@
 
 package org.apache.carbondata.spark.testsuite.badrecordloger
 
-import java.io.File
+import java.io.{File, FileFilter}
 
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
 /**
  * Test Class for detailed query on timestamp datatypes
  *
@@ -46,6 +50,7 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
       sql("drop table IF EXISTS empty_timestamp")
       sql("drop table IF EXISTS empty_timestamp_false")
       sql("drop table IF EXISTS dataloadOptionTests")
+      sql("drop table IF EXISTS sales_test")
       sql(
         """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
@@ -247,8 +252,69 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("validate redirected data") {
+    cleanBadRecordPath("default", "sales_test")
+    val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales_test(ID BigInt, date long, country int,
+          actual_price Double, Quantity String, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    try {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_test OPTIONS" +
+          "('bad_records_logger_enable'='false','bad_records_action'='redirect', 'DELIMITER'="
+
+          " ',', 'QUOTECHAR'= '\"')");
+    } catch {
+      case e: Exception => {
+        assert(true)
+      }
+    }
+    val redirectCsvPath = getRedirectCsvPath("default", "sales_test", "0", "0")
+    assert(checkRedirectedCsvContentAvailableInSource(csvFilePath, redirectCsvPath))
+  }
+
+  def getRedirectCsvPath(dbName: String, tableName: String, segment: String, task: String)
= {
+    var badRecordLocation = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment
+ "/" +
+                        task
+    val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
+      override def accept(pathname: File): Boolean = {
+        pathname.getPath.endsWith(".csv")
+      }
+    })
+    listFiles(0)
+  }
+
+  /**
+   *
+   * @param csvFilePath
+   * @param redirectCsvPath
+   */
+  def checkRedirectedCsvContentAvailableInSource(csvFilePath: String,
+      redirectCsvPath: File): Boolean = {
+    val origFileLineList = FileUtils.readLines(new File(csvFilePath))
+    val redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
+    val iterator = redirectedFileLineList.iterator()
+    while (iterator.hasNext) {
+      if (!origFileLineList.contains(iterator.next())) {
+        return false;
+      }
+    }
+    return true
+  }
+
+  def cleanBadRecordPath(dbName: String, tableName: String) = {
+    var badRecordLocation = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
+  }
+
   override def afterAll {
     sql("drop table sales")
+    sql("drop table sales_test")
     sql("drop table serializable_values")
     sql("drop table serializable_values_false")
     sql("drop table insufficientColumn")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
index c09d285..1e34ec8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
@@ -30,6 +30,10 @@ class TestDataLoadWithColumnsMoreThanSchema extends QueryTest with BeforeAndAfte
   override def beforeAll {
     sql("DROP TABLE IF EXISTS char_test")
     sql("DROP TABLE IF EXISTS hive_char_test")
+    sql("DROP TABLE IF EXISTS max_columns_value_test")
+    sql("DROP TABLE IF EXISTS boundary_max_columns_test")
+    sql("DROP TABLE IF EXISTS valid_max_columns_test")
+    sql("DROP TABLE IF EXISTS max_columns_test")
     sql("DROP TABLE IF EXISTS smart_500_DE")
     sql("CREATE TABLE char_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate
timestamp,mark int,name string)STORED BY 'org.apache.carbondata.format'")
     sql("CREATE TABLE hive_char_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate
timestamp,mark int,name string)row format delimited fields terminated by ','")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 154d3ed..2c74657 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
 import org.apache.carbondata.processing.loading.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl}
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
 import org.apache.carbondata.spark.util.Util
 
@@ -71,7 +71,7 @@ object DataLoadProcessorStepOnSpark {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val rowParser = new RowParserImpl(conf.getDataFields, conf)
-
+    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
       wrapException(e, model)
     }
@@ -79,8 +79,16 @@ object DataLoadProcessorStepOnSpark {
     new Iterator[CarbonRow] {
       override def hasNext: Boolean = rows.hasNext
 
+
+
       override def next(): CarbonRow = {
-        val row = new CarbonRow(rowParser.parseRow(rows.next()))
+        var row : CarbonRow = null
+        if(isRawDataRequired) {
+          val rawRow = rows.next()
+           row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+        } else {
+          row = new CarbonRow(rowParser.parseRow(rows.next()))
+        }
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index 9e30b02..78214ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -62,7 +62,13 @@ object TestQueryExecutor {
       property
     }
   }
-
+  val badStorePath = s"$integrationPath/spark-common-test/target/badrecord";
+  try {
+    FileFactory.mkdirs(badStorePath, FileFactory.getFileType(badStorePath))
+  } catch {
+    case e : Exception =>
+      throw e;
+  }
   val hdfsUrl = {
     val property = System.getProperty("hdfs.url")
     if (property == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 7fc8ed3..c5313cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -156,14 +156,12 @@ public class RowConverterImpl implements RowConverter {
 
   @Override
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
-    //TODO: only copy if it is bad record
-    CarbonRow copy = row.getCopy();
     logHolder.setLogged(false);
     logHolder.clear();
     for (int i = 0; i < fieldConverters.length; i++) {
       fieldConverters[i].convert(row, logHolder);
       if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
-        badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
+        badRecordLogger.addBadRecordsToBuilder(row.getRawData(), logHolder.getReason());
         if (badRecordLogger.isDataLoadFail()) {
           String error = "Data load failed due to bad record: " + logHolder.getReason();
           if (!badRecordLogger.isBadRecordLoggerEnable()) {
@@ -178,6 +176,8 @@ public class RowConverterImpl implements RowConverter {
         }
       }
     }
+    // rawData will not be required after this so reset the entry to null.
+    row.setRawData(null);
     return row;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 8a295d9..d41455f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -453,6 +453,7 @@ public class CarbonLoadModel implements Serializable {
     copy.sortScope = sortScope;
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.isAggLoadRequest = isAggLoadRequest;
+    copy.badRecordsLocation = badRecordsLocation;
     return copy;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index ae7ece1..4078a13 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.parser.RowParser;
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * It reads data from record reader and sends data to next step.
@@ -51,7 +52,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
    * executor service to execute the query
    */
   public ExecutorService executorService;
-
+  boolean isRawDataRequired = false;
   public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
@@ -68,6 +69,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
         "InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
             .getTableName()));
+    // if logger is enabled then raw data will be required.
+    this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() {
@@ -77,7 +80,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
-              configuration.isPreFetch(), executorService, rowCounter);
+              configuration.isPreFetch(), executorService, rowCounter, isRawDataRequired);
     }
     return outIterators;
   }
@@ -150,9 +153,11 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     private AtomicLong rowCounter;
 
+    private boolean isRawDataRequired = false;
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
         RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService,
-        AtomicLong rowCounter) {
+        AtomicLong rowCounter, boolean isRawDataRequired) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.rowParser = rowParser;
@@ -164,6 +169,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
       this.preFetch = preFetch;
       this.nextBatch = false;
       this.firstTime = true;
+      this.isRawDataRequired = isRawDataRequired;
     }
 
     @Override
@@ -235,9 +241,17 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
       // Create batch and fill it.
       CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
       int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
-        count++;
+      if (isRawDataRequired) {
+        while (internalHasNext() && count < batchSize) {
+          Object[] rawRow = currentIterator.next();
+          carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(rawRow), rawRow));
+          count++;
+        }
+      } else {
+        while (internalHasNext() && count < batchSize) {
+          carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+          count++;
+        }
       }
       rowCounter.getAndAdd(carbonRowBatch.getSize());
       return carbonRowBatch;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index a18147a..beb1ad1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -56,6 +57,7 @@ import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 
@@ -619,4 +621,30 @@ public final class CarbonDataProcessorUtil {
     }
     return errorMessage;
   }
+  /**
+   * The method returns true is either logger is enabled or action is redirect
+   * @param configuration
+   * @return
+   */
+  public static boolean isRawDataRequired(CarbonDataLoadConfiguration configuration) {
+    boolean isRawDataRequired = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    // if logger is disabled then check if action is redirect then raw data will be required.
+    if (!isRawDataRequired) {
+      Object bad_records_action =
+          configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION);
+      if (null != bad_records_action) {
+        LoggerAction loggerAction = null;
+        try {
+          loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+        } catch (IllegalArgumentException e) {
+          loggerAction = LoggerAction.FORCE;
+        }
+        isRawDataRequired = loggerAction == LoggerAction.REDIRECT;
+      }
+    }
+    return isRawDataRequired;
+  }
+
 }
\ No newline at end of file


Mime
View raw message