carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [37/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors
Date Mon, 15 Aug 2016 07:09:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
new file mode 100644
index 0000000..05c76ef
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class perform the functionality of reading the dictionary metadata file
+ */
+public class CarbonDictionaryMetadataReaderImpl implements CarbonDictionaryMetadataReader {
+
+  /**
+   * carbon table identifier
+   */
+  protected CarbonTableIdentifier carbonTableIdentifier;
+
+  /**
+   * HDFS store path
+   */
+  protected String hdfsStorePath;
+
+  /**
+   * column identifier
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  /**
+   * dictionary metadata file path
+   */
+  protected String columnDictionaryMetadataFilePath;
+
+  /**
+   * dictionary metadata thrift file reader
+   */
+  private ThriftReader dictionaryMetadataFileReader;
+
+  /**
+   * Constructor
+   *
+   * @param hdfsStorePath         HDFS store path
+   * @param carbonTableIdentifier table identifier which will give table name and database name
+   * @param columnIdentifier      column unique identifier
+   */
+  public CarbonDictionaryMetadataReaderImpl(String hdfsStorePath,
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+    this.hdfsStorePath = hdfsStorePath;
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.columnIdentifier = columnIdentifier;
+    initFileLocation();
+  }
+
+  /**
+   * This method will be used to read complete metadata file.
+   * Applicable scenarios:
+   * 1. Query execution. Whenever a query is executed then to read the dictionary file
+   * and define the query scope first dictionary metadata has to be read first.
+   * 2. If dictionary file is read using start and end offset then using this meta list
+   * we can count the total number of dictionary chunks present between the 2 offsets
+   *
+   * @return list of all dictionary meta chunks which contains information for each segment
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public List<CarbonDictionaryColumnMetaChunk> read() throws IOException {
+    List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunks =
+        new ArrayList<CarbonDictionaryColumnMetaChunk>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    CarbonDictionaryColumnMetaChunk columnMetaChunk = null;
+    ColumnDictionaryChunkMeta dictionaryChunkMeta = null;
+    // open dictionary meta thrift reader
+    openThriftReader();
+    // read till dictionary chunk count
+    while (dictionaryMetadataFileReader.hasNext()) {
+      // get the thrift object for dictionary chunk
+      dictionaryChunkMeta = (ColumnDictionaryChunkMeta) dictionaryMetadataFileReader.read();
+      // create a new instance of chunk meta wrapper using thrift object
+      columnMetaChunk = getNewInstanceOfCarbonDictionaryColumnMetaChunk(dictionaryChunkMeta);
+      dictionaryMetaChunks.add(columnMetaChunk);
+    }
+    return dictionaryMetaChunks;
+  }
+
+  /**
+   * This method will be used to read only the last entry of dictionary meta chunk.
+   * Applicable scenarios :
+   * 1. Global dictionary generation for incremental load. In this case only the
+   * last dictionary chunk meta entry has to be read to calculate min, max surrogate
+   * key and start and end offset for the new dictionary chunk.
+   * 2. Truncate operation. While writing dictionary file in case of incremental load
+   * dictionary file needs to be validated for any inconsistency. Here end offset of last
+   * dictionary chunk meta is validated with file size.
+   *
+   * @return last segment entry for dictionary chunk
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public CarbonDictionaryColumnMetaChunk readLastEntryOfDictionaryMetaChunk()
+      throws IOException {
+    ColumnDictionaryChunkMeta dictionaryChunkMeta = null;
+    // open dictionary meta thrift reader
+    openThriftReader();
+    // at the completion of while loop we will get the last dictionary chunk entry
+    while (dictionaryMetadataFileReader.hasNext()) {
+      // get the thrift object for dictionary chunk
+      dictionaryChunkMeta = (ColumnDictionaryChunkMeta) dictionaryMetadataFileReader.read();
+    }
+    // create a new instance of chunk meta wrapper using thrift object
+    CarbonDictionaryColumnMetaChunk columnMetaChunkForLastSegment =
+        getNewInstanceOfCarbonDictionaryColumnMetaChunk(dictionaryChunkMeta);
+    return columnMetaChunkForLastSegment;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void close() throws IOException {
+    if (null != dictionaryMetadataFileReader) {
+      dictionaryMetadataFileReader.close();
+      dictionaryMetadataFileReader = null;
+    }
+  }
+
+  /**
+   * This method will form the path for dictionary metadata file for a given column
+   */
+  protected void initFileLocation() {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath =
+        pathService.getCarbonTablePath(columnIdentifier, this.hdfsStorePath, carbonTableIdentifier);
+    this.columnDictionaryMetadataFilePath =
+        carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
+  }
+
+  /**
+   * This method will open the dictionary file stream for reading
+   *
+   * @throws IOException thrift reader open method throws IOException
+   */
+  private void openThriftReader() throws IOException {
+    // initialise dictionary file reader which will return dictionary thrift object
+    // dictionary thrift object contains a list of byte buffer
+    if (null == dictionaryMetadataFileReader) {
+      dictionaryMetadataFileReader =
+          new ThriftReader(this.columnDictionaryMetadataFilePath, new ThriftReader.TBaseCreator() {
+            @Override public TBase create() {
+              return new ColumnDictionaryChunkMeta();
+            }
+          });
+      // Open it
+      dictionaryMetadataFileReader.open();
+    }
+
+  }
+
+  /**
+   * Given a thrift object thie method will create a new wrapper class object
+   * for dictionary chunk
+   *
+   * @param dictionaryChunkMeta reference for chunk meta thrift object
+   * @return wrapper object of dictionary chunk meta
+   */
+  private CarbonDictionaryColumnMetaChunk getNewInstanceOfCarbonDictionaryColumnMetaChunk(
+      ColumnDictionaryChunkMeta dictionaryChunkMeta) {
+    CarbonDictionaryColumnMetaChunk columnMetaChunk =
+        new CarbonDictionaryColumnMetaChunk(dictionaryChunkMeta.getMin_surrogate_key(),
+            dictionaryChunkMeta.getMax_surrogate_key(), dictionaryChunkMeta.getStart_offset(),
+            dictionaryChunkMeta.getEnd_offset(), dictionaryChunkMeta.getChunk_count());
+    return columnMetaChunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
new file mode 100644
index 0000000..dded6c2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * dictionary reader interface which declares methods for
+ * reading carbon dictionary files
+ */
+public interface CarbonDictionaryReader extends Closeable {
+  /**
+   * This method should be used when complete dictionary data needs to be read.
+   * Applicable scenarios :
+   * 1. Global dictionary generation in case of incremental load
+   * 2. Reading dictionary file on first time query
+   * 3. Loading a dictionary column in memory based on query requirement.
+   * This is a case where carbon column cache feature is enabled in which a
+   * column dictionary is read if it is present in the query.
+   *
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  List<byte[]> read() throws IOException;
+
+  /**
+   * This method should be used when data has to be read from a given offset.
+   * Applicable scenarios :
+   * 1. Incremental data load. If column dictionary is already loaded in memory
+   * and incremental load is done, then for the new query only new dictionary data
+   * has to be read form memory.
+   *
+   * @param startOffset start offset of dictionary file
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  List<byte[]> read(long startOffset) throws IOException;
+
+  /**
+   * This method will be used to read data between given start and end offset.
+   * Applicable scenarios:
+   * 1. Truncate operation. If there is any inconsistency while writing the dictionary file
+   * then we can give the start and end offset till where the data has to be retained.
+   *
+   * @param startOffset start offset of dictionary file
+   * @param endOffset   end offset of dictionary file
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  List<byte[]> read(long startOffset, long endOffset) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
new file mode 100644
index 0000000..a843701
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnDictionaryChunk;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class performs the functionality of reading a carbon dictionary file.
+ * It implements various overloaded method for read functionality.
+ */
+public class CarbonDictionaryReaderImpl implements CarbonDictionaryReader {
+
+  /**
+   * carbon table identifier
+   */
+  protected CarbonTableIdentifier carbonTableIdentifier;
+
+  /**
+   * HDFS store path
+   */
+  protected String hdfsStorePath;
+
+  /**
+   * column name
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  /**
+   * dictionary file path
+   */
+  protected String columnDictionaryFilePath;
+
+  /**
+   * dictionary thrift file reader
+   */
+  private ThriftReader dictionaryFileReader;
+
+  /**
+   * Constructor
+   *
+   * @param hdfsStorePath         HDFS store path
+   * @param carbonTableIdentifier table identifier which will give table name and database name
+   * @param columnIdentifier      column unique identifier
+   */
+  public CarbonDictionaryReaderImpl(String hdfsStorePath,
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+    this.hdfsStorePath = hdfsStorePath;
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.columnIdentifier = columnIdentifier;
+    initFileLocation();
+  }
+
+  /**
+   * This method should be used when complete dictionary data needs to be read.
+   * Applicable scenarios :
+   * 1. Global dictionary generation in case of incremental load
+   * 2. Reading dictionary file on first time query
+   * 3. Loading a dictionary column in memory based on query requirement.
+   * This is a case where carbon column cache feature is enabled in which a
+   * column dictionary is read if it is present in the query.
+   *
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public List<byte[]> read() throws IOException {
+    return read(0L);
+  }
+
+  /**
+   * This method should be used when data has to be read from a given offset.
+   * Applicable scenarios :
+   * 1. Incremental data load. If column dictionary is already loaded in memory
+   * and incremental load is done, then for the new query only new dictionary data
+   * has to be read form memory.
+   *
+   * @param startOffset start offset of dictionary file
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public List<byte[]> read(long startOffset) throws IOException {
+    List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
+        readDictionaryMetadataFile();
+    // get the last entry for carbon dictionary meta chunk
+    CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk =
+        carbonDictionaryColumnMetaChunks.get(carbonDictionaryColumnMetaChunks.size() - 1);
+    // end offset till where the dictionary file has to be read
+    long endOffset = carbonDictionaryColumnMetaChunk.getEnd_offset();
+    return read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
+  }
+
+  /**
+   * This method will be used to read data between given start and end offset.
+   * Applicable scenarios:
+   * 1. Truncate operation. If there is any inconsistency while writing the dictionary file
+   * then we can give the start and end offset till where the data has to be retained.
+   *
+   * @param startOffset start offset of dictionary file
+   * @param endOffset   end offset of dictionary file
+   * @return list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public List<byte[]> read(long startOffset, long endOffset) throws IOException {
+    List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
+        readDictionaryMetadataFile();
+    return read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void close() throws IOException {
+    if (null != dictionaryFileReader) {
+      dictionaryFileReader.close();
+      dictionaryFileReader = null;
+    }
+  }
+
+  /**
+   * @param carbonDictionaryColumnMetaChunks dictionary meta chunk list
+   * @param startOffset                      start offset for dictionary data file
+   * @param endOffset                        end offset till where data has
+   *                                         to be read from dictionary data file
+   * @return list of byte array dictionary values
+   * @throws IOException readDictionary file method throws IO exception
+   */
+  private List<byte[]> read(List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks,
+      long startOffset, long endOffset) throws IOException {
+    // calculate the number of chunks to be read from dictionary file from start offset
+    int dictionaryChunkCountsToBeRead =
+        calculateTotalDictionaryChunkCountsToBeRead(carbonDictionaryColumnMetaChunks, startOffset,
+            endOffset);
+    // open dictionary file thrift reader
+    openThriftReader();
+    // read the required number of chunks from dictionary file
+    List<ColumnDictionaryChunk> columnDictionaryChunks =
+        readDictionaryFile(startOffset, dictionaryChunkCountsToBeRead);
+    // convert byte buffer list to byte array list of dictionary vlaues
+    List<byte[]> dictionaryValues =
+        new ArrayList<byte[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (ColumnDictionaryChunk dictionaryChunk : columnDictionaryChunks) {
+      convertAndFillByteBufferListToByteArrayList(dictionaryValues, dictionaryChunk.getValues());
+    }
+    return dictionaryValues;
+  }
+
+  /**
+   * This method will convert and fill list of byte buffer to list of byte array
+   *
+   * @param dictionaryValues          list of byte array. Each byte array is
+   *                                  unique dictionary value
+   * @param dictionaryValueBufferList dictionary thrift object which is a list of byte buffer.
+   *                                  Each dictionary value is a wrapped in byte buffer before
+   *                                  writing to file
+   */
+  private void convertAndFillByteBufferListToByteArrayList(List<byte[]> dictionaryValues,
+      List<ByteBuffer> dictionaryValueBufferList) {
+    for (ByteBuffer buffer : dictionaryValueBufferList) {
+      int length = buffer.limit();
+      byte[] value = new byte[length];
+      buffer.get(value, 0, value.length);
+      dictionaryValues.add(value);
+    }
+  }
+
+  /**
+   * This method will form the path for dictionary file for a given column
+   */
+  protected void initFileLocation() {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(columnIdentifier,
+                this.hdfsStorePath, carbonTableIdentifier);
+    this.columnDictionaryFilePath = carbonTablePath
+        .getDictionaryFilePath(columnIdentifier.getColumnId());
+  }
+
+  /**
+   * This method will read the dictionary file and return the list of dictionary thrift object
+   *
+   * @param dictionaryStartOffset        start offset for dictionary file
+   * @param dictionaryChunkCountToBeRead number of dictionary chunks to be read
+   * @return list of dictionary chunks
+   * @throws IOException setReadOffset method throws I/O exception
+   */
+  private List<ColumnDictionaryChunk> readDictionaryFile(long dictionaryStartOffset,
+      int dictionaryChunkCountToBeRead) throws IOException {
+    List<ColumnDictionaryChunk> dictionaryChunks =
+        new ArrayList<ColumnDictionaryChunk>(dictionaryChunkCountToBeRead);
+    // skip the number of bytes if a start offset is given
+    dictionaryFileReader.setReadOffset(dictionaryStartOffset);
+    // read till dictionary chunk count
+    while (dictionaryFileReader.hasNext()
+        && dictionaryChunks.size() != dictionaryChunkCountToBeRead) {
+      dictionaryChunks.add((ColumnDictionaryChunk) dictionaryFileReader.read());
+    }
+    return dictionaryChunks;
+  }
+
+  /**
+   * This method will read the dictionary metadata file for a given column
+   * and calculate the number of chunks to be read from the dictionary file.
+   * It will do a strict validation for start and end offset as if the offsets are not
+   * exactly matching, because data is written in thrift format, the thrift object
+   * will not be retrieved properly
+   *
+   * @param dictionaryChunkMetaList    list of dictionary chunk metadata
+   * @param dictionaryChunkStartOffset start offset for a dictionary chunk
+   * @param dictionaryChunkEndOffset   end offset for a dictionary chunk
+   * @return
+   */
+  private int calculateTotalDictionaryChunkCountsToBeRead(
+      List<CarbonDictionaryColumnMetaChunk> dictionaryChunkMetaList,
+      long dictionaryChunkStartOffset, long dictionaryChunkEndOffset) {
+    boolean chunkWithStartOffsetFound = false;
+    int dictionaryChunkCount = 0;
+    for (CarbonDictionaryColumnMetaChunk metaChunk : dictionaryChunkMetaList) {
+      // find the column meta chunk whose start offset value matches
+      // with the given dictionary start offset
+      if (!chunkWithStartOffsetFound && dictionaryChunkStartOffset == metaChunk.getStart_offset()) {
+        chunkWithStartOffsetFound = true;
+      }
+      // start offset is found then keep adding the chunk count to be read
+      if (chunkWithStartOffsetFound) {
+        dictionaryChunkCount = dictionaryChunkCount + metaChunk.getChunk_count();
+      }
+      // when end offset is reached then break the loop
+      if (dictionaryChunkEndOffset == metaChunk.getEnd_offset()) {
+        break;
+      }
+    }
+    return dictionaryChunkCount;
+  }
+
+  /**
+   * This method will read dictionary metadata file and return the dictionary meta chunks
+   *
+   * @return list of dictionary metadata chunks
+   * @throws IOException read and close method throws IO exception
+   */
+  private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException {
+    CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+    List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null;
+    // read metadata file
+    try {
+      dictionaryMetaChunkList = columnMetadataReaderImpl.read();
+    } finally {
+      // close the metadata reader
+      columnMetadataReaderImpl.close();
+    }
+    return dictionaryMetaChunkList;
+  }
+
+  /**
+   * @return
+   */
+  protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+    return new CarbonDictionaryMetadataReaderImpl(this.hdfsStorePath, carbonTableIdentifier,
+        this.columnIdentifier);
+  }
+
+  /**
+   * This method will open the dictionary file stream for reading
+   *
+   * @throws IOException thrift reader open method throws IOException
+   */
+  private void openThriftReader() throws IOException {
+    if (null == dictionaryFileReader) {
+      // initialise dictionary file reader which will return dictionary thrift object
+      // dictionary thrift object contains a list of byte buffer
+      dictionaryFileReader =
+          new ThriftReader(this.columnDictionaryFilePath, new ThriftReader.TBaseCreator() {
+            @Override public TBase create() {
+              return new ColumnDictionaryChunk();
+            }
+          });
+      // Open dictionary file reader
+      dictionaryFileReader.open();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
new file mode 100644
index 0000000..b9c3ae1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.FileFooter;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reads the metadata from fact file in org.apache.carbondata.format.FileFooter thrift object
+ */
+public class CarbonFooterReader {
+
+  //Fact file path
+  private String filePath;
+
+  //From which offset of file this metadata should be read
+  private long offset;
+
+  public CarbonFooterReader(String filePath, long offset) {
+
+    this.filePath = filePath;
+    this.offset = offset;
+  }
+
+  /**
+   * It reads the metadata in FileFooter thrift object format.
+   *
+   * @return
+   * @throws IOException
+   */
+  public FileFooter readFooter() throws IOException {
+    ThriftReader thriftReader = openThriftReader(filePath);
+    thriftReader.open();
+    //Set the offset from where it should read
+    thriftReader.setReadOffset(offset);
+    FileFooter footer = (FileFooter) thriftReader.read();
+    thriftReader.close();
+    return footer;
+  }
+
+  /**
+   * Open the thrift reader
+   *
+   * @param filePath
+   * @return
+   * @throws IOException
+   */
+  private ThriftReader openThriftReader(String filePath) throws IOException {
+
+    ThriftReader thriftReader = new ThriftReader(filePath, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new FileFooter();
+      }
+    });
+    return thriftReader;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
new file mode 100644
index 0000000..7f9a984
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.IndexHeader;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileReader {
+
+  /**
+   * reader
+   */
+  private ThriftReader thriftReader;
+
+  /**
+   * Below method will be used to read the index header
+   *
+   * @return index header
+   * @throws IOException if any problem  while reader the header
+   */
+  public IndexHeader readIndexHeader() throws IOException {
+    IndexHeader indexHeader = (IndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new IndexHeader();
+      }
+    });
+    return indexHeader;
+  }
+
+  /**
+   * Below method will be used to close the reader
+   */
+  public void closeThriftReader() {
+    thriftReader.close();
+  }
+
+  /**
+   * Below method will be used to read the block index from fie
+   *
+   * @return block index info
+   * @throws IOException if problem while reading the block index
+   */
+  public BlockIndex readBlockIndexInfo() throws IOException {
+    BlockIndex blockInfo = (BlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new BlockIndex();
+      }
+    });
+    return blockInfo;
+  }
+
+  /**
+   * Open the thrift reader
+   *
+   * @param filePath
+   * @throws IOException
+   */
+  public void openThriftReader(String filePath) throws IOException {
+    thriftReader = new ThriftReader(filePath);
+    thriftReader.open();
+  }
+
+  /**
+   * check if any more object is present
+   *
+   * @return true if any more object can be read
+   * @throws IOException
+   */
+  public boolean hasNext() throws IOException {
+    return thriftReader.hasNext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
new file mode 100644
index 0000000..0958349
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+/**
+ * A simple class for reading Thrift objects (of a single type) from a fileName.
+ */
+public class ThriftReader {
+  /**
+   * buffer size
+   */
+  private static final int bufferSize = 2048;
+  /**
+   * File containing the objects.
+   */
+  private String fileName;
+  /**
+   * Used to create empty objects that will be initialized with values from the fileName.
+   */
+  private TBaseCreator creator;
+  /**
+   * For reading the fileName.
+   */
+  private DataInputStream dataInputStream;
+  /**
+   * For reading the binary thrift objects.
+   */
+  private TProtocol binaryIn;
+
+  /**
+   * Constructor.
+   */
+  public ThriftReader(String fileName, TBaseCreator creator) {
+    this.fileName = fileName;
+    this.creator = creator;
+  }
+
+  /**
+   * Constructor.
+   */
+  public ThriftReader(String fileName) {
+    this.fileName = fileName;
+  }
+
+  /**
+   * Opens the fileName for reading.
+   */
+  public void open() throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize);
+    binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
+  }
+
+  /**
+   * This method will set the position of stream from where data has to be read
+   */
+  public void setReadOffset(long bytesToSkip) throws IOException {
+    if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
+      throw new IOException("It doesn't set the offset properly");
+    }
+  }
+
+  /**
+   * Checks if another objects is available by attempting to read another byte from the stream.
+   */
+  public boolean hasNext() throws IOException {
+    dataInputStream.mark(1);
+    int val = dataInputStream.read();
+    dataInputStream.reset();
+    return val != -1;
+  }
+
+  /**
+   * Reads the next object from the fileName.
+   */
+  public TBase read() throws IOException {
+    TBase t = creator.create();
+    try {
+      t.read(binaryIn);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    return t;
+  }
+
+  /**
+   * Reads the next object from the fileName.
+   *
+   * @param creator type of object which will be returned
+   * @throws IOException any problem while reading
+   */
+  public TBase read(TBaseCreator creator) throws IOException {
+    TBase t = creator.create();
+    try {
+      t.read(binaryIn);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    return t;
+  }
+
+  /**
+   * Close the fileName.
+   */
+  public void close() {
+    CarbonUtil.closeStreams(dataInputStream);
+  }
+
+  /**
+   * Thrift deserializes by taking an existing object and populating it. ThriftReader
+   * needs a way of obtaining instances of the class to be populated and this interface
+   * defines the mechanism by which a client provides these instances.
+   */
+  public static interface TBaseCreator {
+    TBase create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
new file mode 100644
index 0000000..e0bb413
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader.sortindex;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for reading the dictionary sort index and sort index inverted
+ */
+public interface CarbonDictionarySortIndexReader extends Closeable {
+
+  /**
+   * method for reading the carbon dictionary sort index data
+   * from columns sortIndex file.
+   *
+   * @return The method return's the list of dictionary sort Index and sort Index reverse
+   * @throws IOException In case any I/O error occurs
+   */
+  public List<Integer> readSortIndex() throws IOException;
+
+  /**
+   * method for reading the carbon dictionary inverted sort index data
+   * from columns sortIndex file.
+   *
+   * @return The method return's the list of dictionary inverted sort Index
+   * @throws IOException In case any I/O error occurs
+   */
+  public List<Integer> readInvertedSortIndex() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
new file mode 100644
index 0000000..70628b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader.sortindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnSortInfo;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Implementation for reading the dictionary sort index and inverted sort index .
+ */
+public class CarbonDictionarySortIndexReaderImpl implements CarbonDictionarySortIndexReader {
+
+  /**
+   * carbonTable Identifier holding the info of databaseName and tableName
+   */
+  protected CarbonTableIdentifier carbonTableIdentifier;
+
+  /**
+   * column name
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  /**
+   * hdfs store location
+   */
+  protected String carbonStorePath;
+
+  /**
+   * the path of the dictionary Sort Index file
+   */
+  protected String sortIndexFilePath;
+
+  /**
+   * Column sort info thrift instance.
+   */
+  ColumnSortInfo columnSortInfo = null;
+
+  /**
+   * Comment for <code>LOGGER</code>
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDictionarySortIndexReaderImpl.class.getName());
+
+  /**
+   * dictionary sortIndex file Reader
+   */
+  private ThriftReader dictionarySortIndexThriftReader;
+
+  /**
+   * @param carbonTableIdentifier Carbon Table identifier holding the database name and table name
+   * @param columnIdentifier      column name
+   * @param carbonStorePath       carbon store path
+   */
+  public CarbonDictionarySortIndexReaderImpl(final CarbonTableIdentifier carbonTableIdentifier,
+      final ColumnIdentifier columnIdentifier, final String carbonStorePath) {
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.columnIdentifier = columnIdentifier;
+    this.carbonStorePath = carbonStorePath;
+  }
+
+  /**
+   * method for reading the carbon dictionary sort index data
+   * from columns sortIndex file.
+   *
+   * @return The method return's the list of dictionary sort Index and sort Index reverse
+   * In case of no member for column empty list will be return
+   * @throws IOException In case any I/O error occurs
+   */
+  @Override public List<Integer> readSortIndex() throws IOException {
+    if (null == columnSortInfo) {
+      readColumnSortInfo();
+    }
+    return columnSortInfo.getSort_index();
+  }
+
+  /**
+   * method for reading the carbon dictionary sort index data
+   * from columns sortIndex file.
+   * In case of no member empty list will be return
+   *
+   * @throws IOException In case any I/O error occurs
+   */
+  private void readColumnSortInfo() throws IOException {
+    init();
+    try {
+      columnSortInfo = (ColumnSortInfo) dictionarySortIndexThriftReader.read();
+    } catch (IOException ie) {
+      LOGGER.error(ie, "problem while reading the column sort info.");
+      throw new IOException("problem while reading the column sort info.", ie);
+    } finally {
+      if (null != dictionarySortIndexThriftReader) {
+        dictionarySortIndexThriftReader.close();
+      }
+    }
+  }
+
+  /**
+   * method for reading the carbon dictionary inverted sort index data
+   * from columns sortIndex file.
+   *
+   * @return The method return's the list of dictionary inverted sort Index
+   * @throws IOException In case any I/O error occurs
+   */
+  @Override public List<Integer> readInvertedSortIndex() throws IOException {
+    if (null == columnSortInfo) {
+      readColumnSortInfo();
+    }
+    return columnSortInfo.getSort_index_inverted();
+  }
+
+  /**
+   * The method initializes the dictionary Sort Index file path
+   * and initialize and opens the thrift reader for dictionary sortIndex file.
+   *
+   * @throws IOException if any I/O errors occurs
+   */
+  private void init() throws IOException {
+    initPath();
+    openThriftReader();
+  }
+
+  protected void initPath() {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath =
+        pathService.getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
+    try {
+      CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry =
+          getChunkMetaObjectForLastSegmentEntry();
+      long dictOffset = chunkMetaObjectForLastSegmentEntry.getEnd_offset();
+      this.sortIndexFilePath =
+          carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+      if (!FileFactory
+          .isFileExist(this.sortIndexFilePath, FileFactory.getFileType(this.sortIndexFilePath))) {
+        this.sortIndexFilePath =
+            carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+      }
+    } catch (IOException e) {
+      this.sortIndexFilePath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+    }
+
+  }
+
+  /**
+   * This method will read the dictionary chunk metadata thrift object for last entry
+   *
+   * @return last entry of dictionary meta chunk
+   * @throws IOException if an I/O error occurs
+   */
+  private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
+      throws IOException {
+    CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+    try {
+      // read the last segment entry for dictionary metadata
+      return columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
+    } finally {
+      // Close metadata reader
+      columnMetadataReaderImpl.close();
+    }
+  }
+
+  /**
+   * @return
+   */
+  protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+    return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, carbonTableIdentifier,
+        columnIdentifier);
+  }
+
+  /**
+   * This method will open the dictionary sort index file stream for reading
+   *
+   * @throws IOException in case any I/O errors occurs
+   */
+  private void openThriftReader() throws IOException {
+    this.dictionarySortIndexThriftReader =
+        new ThriftReader(this.sortIndexFilePath, new ThriftReader.TBaseCreator() {
+          @Override public TBase create() {
+            return new ColumnSortInfo();
+          }
+        });
+    dictionarySortIndexThriftReader.open();
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void close() throws IOException {
+    if (null != dictionarySortIndexThriftReader) {
+      dictionarySortIndexThriftReader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
new file mode 100644
index 0000000..1c97082
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Column Unique id generator
+ */
+public interface ColumnUniqueIdService {
+
+  /**
+   * @param databaseName
+   * @param columnSchema
+   * @return generate unique id
+   */
+  public String generateUniqueId(String databaseName, ColumnSchema columnSchema);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
new file mode 100644
index 0000000..9b9ade6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryReader;
+import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+
+/**
+ * Dictionary service to get writer and reader
+ */
+public interface DictionaryService {
+
+  /**
+   * get dictionary writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  public CarbonDictionaryWriter getDictionaryWriter(CarbonTableIdentifier carbonTableIdentifier,
+      ColumnIdentifier columnIdentifier, String carbonStorePath);
+
+  /**
+   * get dictionary sort index writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  public CarbonDictionarySortIndexWriter getDictionarySortIndexWriter(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath);
+
+  /**
+   * get dictionary metadata reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  public CarbonDictionaryMetadataReader getDictionaryMetadataReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath);
+
+  /**
+   * get dictionary reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  public CarbonDictionaryReader getDictionaryReader(CarbonTableIdentifier carbonTableIdentifier,
+      ColumnIdentifier columnIdentifier, String carbonStorePath);
+
+  /**
+   * get dictionary sort index reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  public CarbonDictionarySortIndexReader getDictionarySortIndexReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+      String carbonStorePath);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/PathService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/PathService.java b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
new file mode 100644
index 0000000..d3295f5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+
+/**
+ * Create helper to get path details
+ */
+public interface PathService {
+
+  /**
+   * @param columnIdentifier
+   * @param storeLocation
+   * @param tableIdentifier
+   * @return store path related to tables
+   */
+  CarbonTablePath getCarbonTablePath(ColumnIdentifier columnIdentifier, String storeLocation,
+      CarbonTableIdentifier tableIdentifier);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
new file mode 100644
index 0000000..2f91d1e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Util class for byte comparision
+ */
+public final class ByteUtil {
+
+  private static final int SIZEOF_LONG = 8;
+
+  private ByteUtil() {
+
+  }
+
+  /**
+   * Compare method for bytes
+   *
+   * @param buffer1
+   * @param buffer2
+   * @return
+   */
+  public static int compare(byte[] buffer1, byte[] buffer2) {
+    // Short circuit equal case
+    if (buffer1 == buffer2) {
+      return 0;
+    }
+    // Bring WritableComparator code local
+    int i = 0;
+    int j = 0;
+    for (; i < buffer1.length && j < buffer2.length; i++, j++) {
+      int a = (buffer1[i] & 0xff);
+      int b = (buffer2[j] & 0xff);
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * covert the long[] to int[]
+   *
+   * @param longArray
+   * @return
+   */
+  public static int[] convertToIntArray(long[] longArray) {
+    int[] intArray = new int[longArray.length];
+    for (int i = 0; i < longArray.length; i++) {
+      intArray[i] = (int) longArray[i];
+
+    }
+    return intArray;
+  }
+
+  /**
+   * Unsafe comparator
+   */
+  public enum UnsafeComparer {
+    /**
+     * instance.
+     */
+    INSTANCE;
+
+    /**
+     * unsafe .
+     */
+    static final sun.misc.Unsafe THEUNSAFE;
+
+    /**
+     * The offset to the first element in a byte array.
+     */
+    static final int BYTE_ARRAY_BASE_OFFSET;
+    static final boolean LITTLEENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+    static {
+      THEUNSAFE = (sun.misc.Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
+        @Override public Object run() {
+          try {
+            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+            f.setAccessible(true);
+            return f.get(null);
+          } catch (NoSuchFieldException e) {
+            // It doesn't matter what we throw;
+            // it's swallowed in getBestComparer().
+            throw new Error();
+          } catch (IllegalAccessException e) {
+            throw new Error();
+          }
+        }
+      });
+
+      BYTE_ARRAY_BASE_OFFSET = THEUNSAFE.arrayBaseOffset(byte[].class);
+
+      // sanity check - this should never fail
+      if (THEUNSAFE.arrayIndexScale(byte[].class) != 1) {
+        throw new AssertionError();
+      }
+
+    }
+
+    /**
+     * Returns true if x1 is less than x2, when both values are treated as
+     * unsigned.
+     */
+    static boolean lessThanUnsigned(long x1, long x2) {
+      return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
+    }
+
+    /**
+     * Lexicographically compare two arrays.
+     *
+     * @param buffer1 left operand
+     * @param buffer2 right operand
+     * @param offset1 Where to start comparing in the left buffer
+     * @param offset2 Where to start comparing in the right buffer
+     * @param length1 How much to compare from the left buffer
+     * @param length2 How much to compare from the right buffer
+     * @return 0 if equal, < 0 if left is less than right, etc.
+     */
+    public int compareTo(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2,
+        int length2) {
+      // Short circuit equal case
+      if (buffer1 == buffer2 && offset1 == offset2 && length1 == length2) {
+        return 0;
+      }
+      int minLength = Math.min(length1, length2);
+      int minWords = minLength / SIZEOF_LONG;
+      int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
+      int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
+
+      /*
+       * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
+       * at a time is no slower than comparing 4 bytes at a time even on
+       * 32-bit. On the other hand, it is substantially faster on 64-bit.
+       */
+      for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
+        long lw = THEUNSAFE.getLong(buffer1, offset1Adj + (long) i);
+        long rw = THEUNSAFE.getLong(buffer2, offset2Adj + (long) i);
+        long diff = lw ^ rw;
+
+        if (diff != 0) {
+          if (!LITTLEENDIAN) {
+            return lessThanUnsigned(lw, rw) ? -1 : 1;
+          }
+
+          // Use binary search
+          int n = 0;
+          int y;
+          int x = (int) diff;
+          if (x == 0) {
+            x = (int) (diff >>> 32);
+            n = 32;
+          }
+
+          y = x << 16;
+          if (y == 0) {
+            n += 16;
+          } else {
+            x = y;
+          }
+
+          y = x << 8;
+          if (y == 0) {
+            n += 8;
+          }
+          return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+        }
+      }
+
+      // The epilogue to cover the last (minLength % 8) elements.
+      for (int i = minWords * SIZEOF_LONG; i < minLength; i++) {
+        int a = (buffer1[offset1 + i] & 0xff);
+        int b = (buffer2[offset2 + i] & 0xff);
+        if (a != b) {
+          return a - b;
+        }
+      }
+      return length1 - length2;
+    }
+
+    public int compareTo(byte[] buffer1, byte[] buffer2) {
+
+      // Short circuit equal case
+      if (buffer1 == buffer2) {
+        return 0;
+      }
+      int len1 = buffer1.length;
+      int len2 = buffer2.length;
+      int minLength = (len1 <= len2) ? len1 : len2;
+      int minWords = 0;
+
+      /*
+       * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
+       * at a time is no slower than comparing 4 bytes at a time even on
+       * 32-bit. On the other hand, it is substantially faster on 64-bit.
+       */
+      if (minLength > 7) {
+        minWords = minLength / SIZEOF_LONG;
+        for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
+          long lw = THEUNSAFE.getLong(buffer1, BYTE_ARRAY_BASE_OFFSET + (long) i);
+          long rw = THEUNSAFE.getLong(buffer2, BYTE_ARRAY_BASE_OFFSET + (long) i);
+          long diff = lw ^ rw;
+
+          if (diff != 0) {
+            if (!LITTLEENDIAN) {
+              return lessThanUnsigned(lw, rw) ? -1 : 1;
+            }
+
+            // Use binary search
+            int k = 0;
+            int y;
+            int x = (int) diff;
+            if (x == 0) {
+              x = (int) (diff >>> 32);
+              k = 32;
+            }
+            y = x << 16;
+            if (y == 0) {
+              k += 16;
+            } else {
+              x = y;
+            }
+
+            y = x << 8;
+            if (y == 0) {
+              k += 8;
+            }
+            return (int) (((lw >>> k) & 0xFFL) - ((rw >>> k) & 0xFFL));
+          }
+        }
+      }
+
+      // The epilogue to cover the last (minLength % 8) elements.
+      for (int i = minWords * SIZEOF_LONG; i < minLength; i++) {
+        int a = (buffer1[i] & 0xff);
+        int b = (buffer2[i] & 0xff);
+        if (a != b) {
+          return a - b;
+        }
+      }
+      return len1 - len2;
+    }
+
+    public boolean equals(byte[] buffer1, byte[] buffer2) {
+      if (buffer1.length != buffer2.length) {
+        return false;
+      }
+      int len = buffer1.length / 8;
+      long currentOffset = BYTE_ARRAY_BASE_OFFSET;
+      for (int i = 0; i < len; i++) {
+        long lw = THEUNSAFE.getLong(buffer1, currentOffset);
+        long rw = THEUNSAFE.getLong(buffer2, currentOffset);
+        if (lw != rw) {
+          return false;
+        }
+        currentOffset += 8;
+      }
+      len = buffer1.length % 8;
+      if (len > 0) {
+        for (int i = 0; i < len; i += 1) {
+          long lw = THEUNSAFE.getByte(buffer1, currentOffset);
+          long rw = THEUNSAFE.getByte(buffer2, currentOffset);
+          if (lw != rw) {
+            return false;
+          }
+          currentOffset += 1;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Comparing the 2 byte buffers. This is used in case of data load sorting step.
+     *
+     * @param byteBuffer1
+     * @param byteBuffer2
+     * @return
+     */
+    public int compareTo(ByteBuffer byteBuffer1, ByteBuffer byteBuffer2) {
+
+      // Short circuit equal case
+      if (byteBuffer1 == byteBuffer2) {
+        return 0;
+      }
+      int len1 = byteBuffer1.remaining();
+      int len2 = byteBuffer2.remaining();
+      byte[] buffer1 = new byte[len1];
+      byte[] buffer2 = new byte[len2];
+      byteBuffer1.get(buffer1);
+      byteBuffer2.get(buffer2);
+      return compareTo(buffer1, buffer2);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
new file mode 100644
index 0000000..c60865d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+
+public class CarbonFileFolderComparator implements Comparator<CarbonFile> {
+
+  /**
+   * Below method will be used to compare two file
+   *
+   * @param o1 first file
+   * @param o2 Second file
+   * @return compare result
+   */
+  @Override public int compare(CarbonFile o1, CarbonFile o2) {
+    String firstFileName = o1.getName();
+    String secondFileName = o2.getName();
+    int lastIndexOfO1 = firstFileName.lastIndexOf('_');
+    int lastIndexOfO2 = secondFileName.lastIndexOf('_');
+    int file1 = 0;
+    int file2 = 0;
+
+    try {
+      file1 = Integer.parseInt(firstFileName.substring(lastIndexOfO1 + 1));
+      file2 = Integer.parseInt(secondFileName.substring(lastIndexOfO2 + 1));
+    } catch (NumberFormatException e) {
+      return -1;
+    }
+    return (file1 < file2) ? -1 : (file1 == file2 ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
new file mode 100644
index 0000000..ac504f0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+public class CarbonLoadStatisticsDummy implements LoadStatistics {
+  private CarbonLoadStatisticsDummy() {
+
+  }
+
+  private static CarbonLoadStatisticsDummy carbonLoadStatisticsDummyInstance =
+      new CarbonLoadStatisticsDummy();
+
+  public static CarbonLoadStatisticsDummy getInstance() {
+    return carbonLoadStatisticsDummyInstance;
+  }
+
+  @Override
+  public void  initPartitonInfo(String PartitionId) {
+
+  }
+
+  @Override
+  public void recordDicShuffleAndWriteTime() {
+
+  }
+
+  @Override
+  public void recordLoadCsvfilesToDfTime() {
+
+  }
+
+  @Override
+  public void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint) {
+
+  }
+
+  @Override
+  public void recordCsvInputStepTime(String partitionID, Long csvInputStepTimePoint) {
+
+  }
+
+  @Override
+  public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+
+  }
+
+  @Override
+  public void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint) {
+
+  }
+
+  @Override
+  public void recordSortRowsStepTotalTime(String partitionID, Long sortRowsStepTotalTimePoint) {
+
+  }
+
+  @Override
+  public void recordMdkGenerateTotalTime(String partitionID, Long mdkGenerateTotalTimePoint) {
+
+  }
+
+  @Override
+  public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint) {
+
+  }
+
+  @Override
+  public void recordTotalRecords(long totalRecords) {
+
+  }
+
+  @Override
+  public void recordHostBlockMap(String host, Integer numBlocks) {
+
+  }
+
+  @Override
+  public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+
+  }
+
+  @Override
+  public void printStatisticsInfo(String partitionID) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
new file mode 100644
index 0000000..c9fc8ba
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * A util which provide methods used to record time information druing data loading.
+ */
+public class CarbonLoadStatisticsImpl implements LoadStatistics {
+  private CarbonLoadStatisticsImpl() {
+
+  }
+
+  private static CarbonLoadStatisticsImpl carbonLoadStatisticsImplInstance =
+          new CarbonLoadStatisticsImpl();
+
+  public static CarbonLoadStatisticsImpl getInstance() {
+    return carbonLoadStatisticsImplInstance;
+  }
+
+  private final LogService LOGGER =
+          LogServiceFactory.getLogService(CarbonLoadStatisticsImpl.class.getName());
+
+  /*
+   *We only care about the earliest start time(EST) and the latest end time(LET) of different
+   *threads, who does the same thing, LET - EST is the cost time of doing one thing using
+   *multiple thread.
+ */
+  private long loadCsvfilesToDfStartTime = 0;
+  private long loadCsvfilesToDfCostTime = 0;
+  private long dicShuffleAndWriteFileTotalStartTime = 0;
+  private long dicShuffleAndWriteFileTotalCostTime = 0;
+
+  //LRU cache load one time
+  private double lruCacheLoadTime = 0;
+
+  //Generate surrogate keys total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parDictionaryValuesTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parCsvInputStepTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parGeneratingDictionaryValuesTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //Sort rows step total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parSortRowsStepTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //MDK generate total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parMdkGenerateTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parDictionaryValue2MdkAdd2FileTime =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //Node block process information
+  private ConcurrentHashMap<String, Integer> hostBlockMap =
+          new ConcurrentHashMap<String, Integer>();
+
+  //Partition block process information
+  private ConcurrentHashMap<String, Integer> partitionBlockMap =
+          new ConcurrentHashMap<String, Integer>();
+
+  private long totalRecords = 0;
+  private double totalTime = 0;
+
+  @Override
+  public void initPartitonInfo(String PartitionId) {
+    parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
+    parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
+    parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
+    parGeneratingDictionaryValuesTimeMap.put(PartitionId, new Long[2]);
+    parMdkGenerateTotalTimeMap.put(PartitionId, new Long[2]);
+    parDictionaryValue2MdkAdd2FileTime.put(PartitionId, new Long[2]);
+  }
+
+  //Record the time
+  public void recordDicShuffleAndWriteTime() {
+    Long dicShuffleAndWriteTimePoint = System.currentTimeMillis();
+    if (0 == dicShuffleAndWriteFileTotalStartTime) {
+      dicShuffleAndWriteFileTotalStartTime = dicShuffleAndWriteTimePoint;
+    }
+    if (dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime >
+            dicShuffleAndWriteFileTotalCostTime) {
+      dicShuffleAndWriteFileTotalCostTime =
+          dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime;
+    }
+  }
+
+  public void recordLoadCsvfilesToDfTime() {
+    Long loadCsvfilesToDfTimePoint = System.currentTimeMillis();
+    if (0 == loadCsvfilesToDfStartTime) {
+      loadCsvfilesToDfStartTime = loadCsvfilesToDfTimePoint;
+    }
+    if (loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime > loadCsvfilesToDfCostTime) {
+      loadCsvfilesToDfCostTime = loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime;
+    }
+  }
+
+  public double getLruCacheLoadTime() {
+    return lruCacheLoadTime;
+  }
+
+  public void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint) {
+    if (null != parDictionaryValuesTotalTimeMap.get(partitionID)) {
+      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[0]) {
+        parDictionaryValuesTotalTimeMap.get(partitionID)[0] = dictionaryValuesTotalTimeTimePoint;
+      }
+      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[1] ||
+          dictionaryValuesTotalTimeTimePoint - parDictionaryValuesTotalTimeMap.get(partitionID)[0] >
+              parDictionaryValuesTotalTimeMap.get(partitionID)[1]) {
+        parDictionaryValuesTotalTimeMap.get(partitionID)[1] = dictionaryValuesTotalTimeTimePoint -
+            parDictionaryValuesTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordCsvInputStepTime(String partitionID,
+      Long csvInputStepTimePoint) {
+    if (null != parCsvInputStepTimeMap.get(partitionID)) {
+      if (null == parCsvInputStepTimeMap.get(partitionID)[0]) {
+        parCsvInputStepTimeMap.get(partitionID)[0] = csvInputStepTimePoint;
+      }
+      if (null == parCsvInputStepTimeMap.get(partitionID)[1] ||
+              csvInputStepTimePoint - parCsvInputStepTimeMap.get(partitionID)[0] >
+                      parCsvInputStepTimeMap.get(partitionID)[1]) {
+        parCsvInputStepTimeMap.get(partitionID)[1] = csvInputStepTimePoint -
+                parCsvInputStepTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+    this.lruCacheLoadTime = lruCacheLoadTime;
+  }
+
+  public void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint) {
+    if (null != parGeneratingDictionaryValuesTimeMap.get(partitionID)) {
+      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[0]) {
+        parGeneratingDictionaryValuesTimeMap.get(partitionID)[0] =
+                generatingDictionaryValuesTimePoint;
+      }
+      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] ||
+              generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+                      .get(partitionID)[0] > parGeneratingDictionaryValuesTimeMap
+                      .get(partitionID)[1]) {
+        parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] =
+                generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+                        .get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordSortRowsStepTotalTime(String partitionID,
+                                          Long sortRowsStepTotalTimePoint) {
+    if (null != parSortRowsStepTotalTimeMap.get(partitionID)) {
+      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[0]) {
+        parSortRowsStepTotalTimeMap.get(partitionID)[0] = sortRowsStepTotalTimePoint;
+      }
+      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[1] ||
+              sortRowsStepTotalTimePoint - parSortRowsStepTotalTimeMap.get(partitionID)[0] >
+                      parSortRowsStepTotalTimeMap.get(partitionID)[1]) {
+        parSortRowsStepTotalTimeMap.get(partitionID)[1] = sortRowsStepTotalTimePoint -
+                parSortRowsStepTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordMdkGenerateTotalTime(String partitionID,
+                                         Long mdkGenerateTotalTimePoint) {
+    if (null != parMdkGenerateTotalTimeMap.get(partitionID)) {
+      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[0]) {
+        parMdkGenerateTotalTimeMap.get(partitionID)[0] = mdkGenerateTotalTimePoint;
+      }
+      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[1] ||
+              mdkGenerateTotalTimePoint - parMdkGenerateTotalTimeMap.get(partitionID)[0] >
+                      parMdkGenerateTotalTimeMap.get(partitionID)[1]) {
+        parMdkGenerateTotalTimeMap.get(partitionID)[1] = mdkGenerateTotalTimePoint -
+                parMdkGenerateTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint) {
+    if (null != parDictionaryValue2MdkAdd2FileTime.get(partitionID)) {
+      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0]) {
+        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0] =
+                dictionaryValue2MdkAdd2FileTimePoint;
+      }
+      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] ||
+              dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+                      .get(partitionID)[0] > parDictionaryValue2MdkAdd2FileTime
+                      .get(partitionID)[1]) {
+        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] =
+                dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+                        .get(partitionID)[0];
+      }
+    }
+  }
+
+  //Record the node blocks information map
+  public void recordHostBlockMap(String host, Integer numBlocks) {
+    hostBlockMap.put(host, numBlocks);
+  }
+
+  //Record the partition blocks information map
+  public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+    partitionBlockMap.put(partitionID, numBlocks);
+  }
+
+  public void recordTotalRecords(long totalRecords) {
+    this.totalRecords = totalRecords;
+  }
+
+  //Get the time
+  private double getDicShuffleAndWriteFileTotalTime() {
+    return dicShuffleAndWriteFileTotalCostTime / 1000.0;
+  }
+
+  private double getLoadCsvfilesToDfTime() {
+    return loadCsvfilesToDfCostTime / 1000.0;
+  }
+
+  private double getDictionaryValuesTotalTime(String partitionID) {
+    return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getCsvInputStepTime(String partitionID) {
+    return parCsvInputStepTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getGeneratingDictionaryValuesTime(String partitionID) {
+    return parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getSortRowsStepTotalTime(String partitionID) {
+    return parSortRowsStepTotalTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getDictionaryValue2MdkAdd2FileTime(String partitionID) {
+    return parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] / 1000.0;
+  }
+
+  //Get the hostBlockMap
+  private ConcurrentHashMap<String, Integer> getHostBlockMap() {
+    return hostBlockMap;
+  }
+
+  //Get the partitionBlockMap
+  private ConcurrentHashMap<String, Integer> getPartitionBlockMap() {
+    return partitionBlockMap;
+  }
+
+  //Speed calculate
+  private long getTotalRecords() {
+    return this.totalRecords;
+  }
+
+  private int getLoadSpeed() {
+    return (int)(totalRecords / totalTime);
+  }
+
+  private int getGenDicSpeed() {
+    return (int)(totalRecords / getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime());
+  }
+
+  private int getReadCSVSpeed(String partitionID) {
+    return (int)(totalRecords / getCsvInputStepTime(partitionID));
+  }
+
+  private int getGenSurKeySpeed(String partitionID) {
+    return (int)(totalRecords / getGeneratingDictionaryValuesTime(partitionID));
+  }
+
+  private int getSortKeySpeed(String partitionID) {
+    return (int)(totalRecords / getSortRowsStepTotalTime(partitionID));
+  }
+
+  private int getMDKSpeed(String partitionID) {
+    return (int)(totalRecords / getDictionaryValue2MdkAdd2FileTime(partitionID));
+  }
+
+  private double getTotalTime(String partitionID) {
+    this.totalTime = getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime() +
+        getLruCacheLoadTime() + getDictionaryValuesTotalTime(partitionID) +
+        getDictionaryValue2MdkAdd2FileTime(partitionID);
+    return totalTime;
+  }
+
+  //Print the statistics information
+  private void printDicGenStatisticsInfo() {
+    double loadCsvfilesToDfTime = getLoadCsvfilesToDfTime();
+    LOGGER.audit("STAGE 1 ->Load csv to DataFrame and generate" +
+            " block distinct values: " + loadCsvfilesToDfTime + "(s)");
+    double dicShuffleAndWriteFileTotalTime = getDicShuffleAndWriteFileTotalTime();
+    LOGGER.audit("STAGE 2 ->Global dict shuffle and write dict file: " +
+            + dicShuffleAndWriteFileTotalTime + "(s)");
+  }
+
+  private void printLruCacheLoadTimeInfo() {
+    LOGGER.audit("STAGE 3 ->LRU cache load: " + getLruCacheLoadTime() + "(s)");
+  }
+
+  private void printDictionaryValuesGenStatisticsInfo(String partitionID) {
+    double dictionaryValuesTotalTime = getDictionaryValuesTotalTime(partitionID);
+    LOGGER.audit("STAGE 4 ->Total cost of gen dictionary values, sort and write to temp files: "
+            + dictionaryValuesTotalTime + "(s)");
+    double csvInputStepTime = getCsvInputStepTime(partitionID);
+    double generatingDictionaryValuesTime = getGeneratingDictionaryValuesTime(partitionID);
+    LOGGER.audit("STAGE 4.1 ->  |_read csv file: " + csvInputStepTime + "(s)");
+    LOGGER.audit("STAGE 4.2 ->  |_transform to surrogate key: "
+            + generatingDictionaryValuesTime + "(s)");
+  }
+
+  private void printSortRowsStepStatisticsInfo(String partitionID) {
+    double sortRowsStepTotalTime = getSortRowsStepTotalTime(partitionID);
+    LOGGER.audit("STAGE 4.3 ->  |_sort rows and write to temp file: "
+            + sortRowsStepTotalTime + "(s)");
+  }
+
+  private void printGenMdkStatisticsInfo(String partitionID) {
+    double dictionaryValue2MdkAdd2FileTime = getDictionaryValue2MdkAdd2FileTime(partitionID);
+    LOGGER.audit("STAGE 5 ->Transform to MDK, compress and write fact files: "
+            + dictionaryValue2MdkAdd2FileTime + "(s)");
+  }
+
+  //Print the node blocks information
+  private void printHostBlockMapInfo() {
+    LOGGER.audit("========== BLOCK_INFO ==========");
+    if (getHostBlockMap().size() > 0) {
+      for (String host: getHostBlockMap().keySet()) {
+        LOGGER.audit("BLOCK_INFO ->Node host: " + host);
+        LOGGER.audit("BLOCK_INFO ->The block count in this node: " + getHostBlockMap().get(host));
+      }
+    } else if (getPartitionBlockMap().size() > 0) {
+      for (String parID: getPartitionBlockMap().keySet()) {
+        LOGGER.audit("BLOCK_INFO ->Partition ID: " + parID);
+        LOGGER.audit("BLOCK_INFO ->The block count in this partition: " +
+                getPartitionBlockMap().get(parID));
+      }
+    }
+  }
+
+  //Print the speed information
+  private void printLoadSpeedInfo(String partitionID) {
+    LOGGER.audit("===============Load_Speed_Info===============");
+    LOGGER.audit("Total Num of Records Processed: " + getTotalRecords());
+    LOGGER.audit("Total Time Cost: " + getTotalTime(partitionID) + "(s)");
+    LOGGER.audit("Total Load Speed: " + getLoadSpeed() + "records/s");
+    LOGGER.audit("Generate Dictionaries Speed: " + getGenDicSpeed() + "records/s");
+    LOGGER.audit("Read CSV Speed: " + getReadCSVSpeed(partitionID) + " records/s");
+    LOGGER.audit("Generate Surrogate Key Speed: " + getGenSurKeySpeed(partitionID) + " records/s");
+    LOGGER.audit("Sort Key/Write Temp Files Speed: " + getSortKeySpeed(partitionID) + " records/s");
+    LOGGER.audit("MDK Step Speed: " + getMDKSpeed(partitionID) + " records/s");
+    LOGGER.audit("=============================================");
+  }
+
+  public void printStatisticsInfo(String partitionID) {
+    try {
+      LOGGER.audit("========== TIME_STATISTICS PartitionID: " + partitionID + "==========");
+      printDicGenStatisticsInfo();
+      printLruCacheLoadTimeInfo();
+      printDictionaryValuesGenStatisticsInfo(partitionID);
+      printSortRowsStepStatisticsInfo(partitionID);
+      printGenMdkStatisticsInfo(partitionID);
+      printHostBlockMapInfo();
+      printLoadSpeedInfo(partitionID);
+    } catch (Exception e) {
+      LOGGER.audit("Can't print Statistics Information");
+    } finally {
+      resetLoadStatistics();
+    }
+  }
+
+  //Reset the load statistics values
+  private void resetLoadStatistics() {
+    loadCsvfilesToDfStartTime = 0;
+    loadCsvfilesToDfCostTime = 0;
+    dicShuffleAndWriteFileTotalStartTime = 0;
+    dicShuffleAndWriteFileTotalCostTime = 0;
+    lruCacheLoadTime = 0;
+    totalRecords = 0;
+    totalTime = 0;
+    parDictionaryValuesTotalTimeMap.clear();
+    parCsvInputStepTimeMap.clear();
+    parSortRowsStepTotalTimeMap.clear();
+    parGeneratingDictionaryValuesTimeMap.clear();
+    parMdkGenerateTotalTimeMap.clear();
+    parDictionaryValue2MdkAdd2FileTime.clear();
+  }
+
+}



Mime
View raw message