carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: Rebased with metadata [Forced Update!]
Date Thu, 27 Jul 2017 11:04:15 GMT
Repository: carbondata
Updated Branches:
  refs/heads/datamap f4ab1ff69 -> 15036b1d5 (forced update)


Rebased with metadata


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/15036b1d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/15036b1d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/15036b1d

Branch: refs/heads/datamap
Commit: 15036b1d513378bdc392c5bd0f688446ee3bd17c
Parents: 2015a3e
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Thu Jul 13 14:18:02 2017 +0530
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Thu Jul 27 09:10:21 2017 +0530

----------------------------------------------------------------------
 .../core/indexstore/UnsafeMemoryDMStore.java    | 27 +++----
 .../blockletindex/BlockletDataMap.java          | 11 +--
 .../core/indexstore/row/DataMapRow.java         |  4 +-
 .../core/indexstore/row/DataMapRowImpl.java     |  4 +
 .../core/indexstore/row/UnsafeDataMapRow.java   | 40 +++++++++-
 .../core/memory/UnsafeMemoryManager.java        | 16 ++--
 .../hadoop/api/CarbonTableInputFormat.java      | 81 ++++++++++----------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  2 +-
 8 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 8246f99..737586e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,9 @@ package org.apache.carbondata.core.indexstore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
@@ -39,8 +39,6 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private MemoryAllocator memoryAllocator;
-
   private boolean isMemoryFreed;
 
   private DataMapSchema[] schema;
@@ -49,11 +47,10 @@ public class UnsafeMemoryDMStore {
 
   private int rowCount;
 
-  public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+  public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
     this.schema = schema;
-    this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
     this.allocatedSize = capacity;
-    this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+    this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize);
     this.pointers = new int[1000];
   }
 
@@ -63,13 +60,13 @@ public class UnsafeMemoryDMStore {
    *
    * @param rowSize
    */
-  private void ensureSize(int rowSize) {
+  private void ensureSize(int rowSize) throws MemoryException {
     if (runningLength + rowSize >= allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize +
capacity);
+          UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize + capacity);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       allocatedSize = allocatedSize + capacity;
       memoryBlock = allocate;
     }
@@ -86,7 +83,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) {
+  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -168,13 +165,13 @@ public class UnsafeMemoryDMStore {
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
 
-  public void finishWriting() {
+  public void finishWriting() throws MemoryException {
     if (runningLength < allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+          UnsafeMemoryManager.allocateMemoryWithRetry(runningLength);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       memoryBlock = allocate;
     }
     // Compact pointers.
@@ -187,7 +184,7 @@ public class UnsafeMemoryDMStore {
 
   public void freeMemory() {
     if (!isMemoryFreed) {
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       isMemoryFreed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 79aa091..680852d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       if (unsafeMemoryDMStore != null) {
         unsafeMemoryDMStore.finishWriting();
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
         DataOutput dataOutput = new DataOutputStream(stream);
         blockletInfo.write(dataOutput);
         serializedData = stream.toByteArray();
-      } catch (IOException e) {
+        row.setByteArray(serializedData, ordinal);
+        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      row.setByteArray(serializedData, ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
     }
   }
 
@@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return minRow;
   }
 
-  private void createSchema(SegmentProperties segmentProperties) {
+  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
     List<DataMapSchema> indexSchemas = new ArrayList<>();
 
     // Index key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index defe766..631e0ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -62,6 +62,8 @@ public abstract class DataMapRow {
 
   public abstract double getDouble(int ordinal);
 
+  public abstract int getLengthInBytes(int ordinal);
+
   public int getTotalSizeInBytes() {
     int len = 0;
     for (int i = 0; i < schemas.length; i++) {
@@ -75,7 +77,7 @@ public abstract class DataMapRow {
       case FIXED:
         return schemas[ordinal].getLength();
       case VARIABLE:
-        return getByteArray(ordinal).length + 2;
+        return getLengthInBytes(ordinal) + 2;
       case STRUCT:
         return getRow(ordinal).getTotalSizeInBytes();
       default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index adec346..32d15d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow {
     return (byte[]) data[ordinal];
   }
 
+  @Override public int getLengthInBytes(int ordinal) {
+    return ((byte[]) data[ordinal]).length;
+  }
+
   @Override public DataMapRow getRow(int ordinal) {
     return (DataMapRow) data[ordinal];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index ef78514..c398115 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow {
     return data;
   }
 
+  @Override public int getLengthInBytes(int ordinal) {
+    int length;
+    int position = getPosition(ordinal);
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer +
position);
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    return length;
+  }
+
+  private int getLengthInBytes(int ordinal, int position) {
+    int length;
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer +
position);
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    return length;
+  }
+
   @Override public DataMapRow getRow(int ordinal) {
     DataMapSchema[] childSchemas =
         ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
@@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow {
     throw new UnsupportedOperationException("Not supported to set on unsafe row");
   }
 
+  private int getSizeInBytes(int ordinal, int position) {
+    switch (schemas[ordinal].getSchemaType()) {
+      case FIXED:
+        return schemas[ordinal].getLength();
+      case VARIABLE:
+        return getLengthInBytes(ordinal, position) + 2;
+      case STRUCT:
+        return getRow(ordinal).getTotalSizeInBytes();
+      default:
+        throw new UnsupportedOperationException("wrong type");
+    }
+  }
+
   private int getPosition(int ordinal) {
     int position = 0;
     for (int i = 0; i < ordinal; i++) {
-      position += getSizeInBytes(i);
+      position += getSizeInBytes(i, position);
     }
     return position;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 28e63a9..c491908 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -101,11 +101,9 @@ public class UnsafeMemoryManager {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
-      if (LOGGER.isDebugEnabled()) {
-        set.add(allocate);
-        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size()
+
-            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() +
"Bytes");
-      }
+      set.add(allocate);
+      LOGGER.info("Memory block (" + allocate + ") is created with size "  + allocate.size()
+
+          ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
       return allocate;
     }
     return null;
@@ -115,11 +113,9 @@ public class UnsafeMemoryManager {
     allocator.free(memoryBlock);
     memoryUsed -= memoryBlock.size();
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-    if (LOGGER.isDebugEnabled()) {
-      set.remove(memoryBlock);
-      LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed
+
-          "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
-    }
+    set.remove(memoryBlock);
+    LOGGER.info("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed
+
+        "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
   }
 
   private synchronized long getAvailableMemory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e73c04a..8938699 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
   /**
-   * It is optional, if user does not set then it reads from store
-   *
-   * @param configuration
-   * @param carbonTable
-   * @throws IOException
+   * Set the `tableInfo` in `configuration`
    */
-  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
       throws IOException {
-    if (null != carbonTable) {
-      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
     }
   }
 
-  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException
{
-    String carbonTableStr = configuration.get(CARBON_TABLE);
-    if (carbonTableStr == null) {
-      populateCarbonTable(configuration);
-      // read it from schema file in the store
-      carbonTableStr = configuration.get(CARBON_TABLE);
-      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
-    }
-    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
   }
 
   /**
-   * this method will read the schema from the physical file and populate into CARBON_TABLE
-   *
-   * @param configuration
-   * @throws IOException
+   * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private static void populateCarbonTable(Configuration configuration) throws IOException
{
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException
{
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved
in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable carbonTable;
+      if (tableInfo != null) {
+        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        carbonTable = SchemaReader.readCarbonTableFromStore(
+            getAbsoluteTableIdentifier(configuration));
+      }
+      this.carbonTable = carbonTable;
+      return carbonTable;
+    } else {
+      return this.carbonTable;
     }
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
-    // read the schema file to get the absoluteTableIdentifier having the correct table id
-    // persisted in the schema
-    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
-    setCarbonTable(configuration, carbonTable);
   }
 
   public static void setTablePath(Configuration configuration, String tablePath)
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
-
   /**
    * It sets unresolved filter expression.
    *
@@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
-    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
-    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     // this will be null in case of corrupt schema file.
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
@@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     Boolean isIUDTable = false;
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(absoluteTableIdentifier);
 
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getCarbonTable(configuration);
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 6bc7564..2e737ab 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    CarbonInputFormat.setTableInfo(job.getConfiguration,
+    CarbonTableInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
     // initialise query_id for job


Mime
View raw message