carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [08/56] [abbrv] incubator-carbondata git commit: [issue-644]Update CarbonFile to support Federation (#661)
Date Thu, 23 Jun 2016 14:15:56 GMT
[issue-644]Update CarbonFile to support Federation (#661)

* Update CarbonFile to support Federation

* Set DefaultFileType to VIEWFS when store-path schema is viewfs://

* fix checkstyle error

* fix instance error of CarbonFile#getParentFile return

* update #ViewFSCarbonFile follow of optimizing nn rpc request


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/25cd9e5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/25cd9e5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/25cd9e5d

Branch: refs/heads/master
Commit: 25cd9e5de13e2b9c561308be2a376ec458be5168
Parents: dbefb7b
Author: Hexiaoqiao <xq.he2009@gmail.com>
Authored: Fri Jun 17 11:39:13 2016 +0800
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Fri Jun 17 09:09:13 2016 +0530

----------------------------------------------------------------------
 .../fileperations/AtomicFileOperationsImpl.java |   3 +-
 .../store/filesystem/AbstractDFSCarbonFile.java | 217 +++++++++++++++++
 .../store/filesystem/HDFSCarbonFile.java        | 243 +++----------------
 .../store/filesystem/ViewFSCarbonFile.java      | 126 ++++++++++
 .../store/impl/DFSFileHolderImpl.java           | 183 ++++++++++++++
 .../datastorage/store/impl/FileFactory.java     |  36 ++-
 .../store/impl/HDFSFileHolderImpl.java          | 186 --------------
 .../org/carbondata/core/util/CarbonUtil.java    |  23 +-
 .../carbondata/hadoop/util/SchemaReader.java    |   3 +-
 .../csvreaderstep/BlockDataHandler.java         |   3 +-
 .../util/CarbonDataProcessorUtil.java           |  12 +-
 11 files changed, 613 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
index fe7a6f3..41f4580 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
@@ -77,7 +77,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
       CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
 
       if (!tempFile.renameForce(filePath)) {
-        throw new IOException("temporary file renaming failed");
+        throw new IOException("temporary file renaming failed, src="
+            + tempFile.getPath() + ", dest=" + filePath);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
new file mode 100644
index 0000000..b04cd47
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.core.datastorage.store.filesystem;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public abstract  class AbstractDFSCarbonFile implements CarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
+  protected FileStatus fileStatus;
+  protected FileSystem fs;
+
+  public AbstractDFSCarbonFile(String filePath) {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    try {
+      fs = path.getFileSystem(FileFactory.getConfiguration());
+      fileStatus = fs.getFileStatus(path);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+  }
+
+  public AbstractDFSCarbonFile(Path path) {
+    try {
+      fs = path.getFileSystem(FileFactory.getConfiguration());
+      fileStatus = fs.getFileStatus(path);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+  }
+
+  public AbstractDFSCarbonFile(FileStatus fileStatus) {
+    this.fileStatus = fileStatus;
+  }
+
+  @Override public boolean createNewFile() {
+    Path path = fileStatus.getPath();
+    try {
+      return fs.createNewFile(path);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override public String getAbsolutePath() {
+    return fileStatus.getPath().toString();
+  }
+
+  @Override public String getName() {
+    return fileStatus.getPath().getName();
+  }
+
+  @Override public boolean isDirectory() {
+    return fileStatus.isDirectory();
+  }
+
+  @Override public boolean exists() {
+    try {
+      if (null != fileStatus) {
+        fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+        return fs.exists(fileStatus.getPath());
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+    return false;
+  }
+
+  @Override public String getCanonicalPath() {
+    return getAbsolutePath();
+  }
+
+  @Override public String getPath() {
+    return getAbsolutePath();
+  }
+
+  @Override public long getSize() {
+    return fileStatus.getLen();
+  }
+
+  public boolean renameTo(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      return fs.rename(fileStatus.getPath(), new Path(changetoName));
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+      return false;
+    }
+  }
+
+  public boolean delete() {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      return fs.delete(fileStatus.getPath(), true);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+      return false;
+    }
+  }
+
+  @Override public long getLastModifiedTime() {
+    return fileStatus.getModificationTime();
+  }
+
+  @Override public boolean setLastModifiedTime(long timestamp) {
+    try {
+      fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+    } catch (IOException e) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * This method will delete the data in file data from a given offset
+   */
+  @Override public boolean truncate(String fileName, long validDataEndOffset) {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    boolean fileTruncatedSuccessfully = false;
+    // if bytes to read less than 1024 then buffer size should be equal to the given offset
+    int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
+        CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
+        (int) validDataEndOffset;
+    // temporary file name
+    String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+    try {
+      CarbonFile tempFile = null;
+      // delete temporary file if it already exists at a given path
+      if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+        tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+        tempFile.delete();
+      }
+      // create new temporary file
+      FileFactory.createNewFile(tempWriteFilePath, fileType);
+      tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+      byte[] buff = new byte[bufferSize];
+      dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
+      // read the data
+      int read = dataInputStream.read(buff, 0, buff.length);
+      dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
+      dataOutputStream.write(buff, 0, read);
+      long remaining = validDataEndOffset - read;
+      // anytime we should not cross the offset to be read
+      while (remaining > 0) {
+        if (remaining > bufferSize) {
+          buff = new byte[bufferSize];
+        } else {
+          buff = new byte[(int) remaining];
+        }
+        read = dataInputStream.read(buff, 0, buff.length);
+        dataOutputStream.write(buff, 0, read);
+        remaining = remaining - read;
+      }
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+      // rename the temp file to original file
+      tempFile.renameForce(fileName);
+      fileTruncatedSuccessfully = true;
+    } catch (IOException e) {
+      LOGGER.error("Exception occured while truncating the file " + e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+    }
+    return fileTruncatedSuccessfully;
+  }
+
+  /**
+   * This method will be used to check whether a file has been modified or not
+   *
+   * @param fileTimeStamp time to be compared with latest timestamp of file
+   * @param endOffset     file length to be compared with current length of file
+   * @return
+   */
+  @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+    boolean isFileModified = false;
+    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+      isFileModified = true;
+    }
+    return isFileModified;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
index a4127f4..98e40b4 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
@@ -19,164 +19,36 @@
 
 package org.carbondata.core.datastorage.store.filesystem;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
-public class HDFSCarbonFile implements CarbonFile {
+public class HDFSCarbonFile extends AbstractDFSCarbonFile {
   /**
    * LOGGER
    */
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
-  private FileStatus fileStatus;
-  private FileSystem fs;
 
   public HDFSCarbonFile(String filePath) {
-    filePath = filePath.replace("\\", "/");
-    Path path = new Path(filePath);
-    try {
-      fs = path.getFileSystem(FileFactory.getConfiguration());
-      fileStatus = fs.getFileStatus(path);
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-    }
+    super(filePath);
   }
 
   public HDFSCarbonFile(Path path) {
-    try {
-      fs = path.getFileSystem(FileFactory.getConfiguration());
-      fileStatus = fs.getFileStatus(path);
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-    }
+    super(path);
   }
 
   public HDFSCarbonFile(FileStatus fileStatus) {
-    this.fileStatus = fileStatus;
-  }
-
-  @Override public boolean createNewFile() {
-    Path path = fileStatus.getPath();
-    try {
-      return fs.createNewFile(path);
-    } catch (IOException e) {
-      return false;
-    }
-
-  }
-
-  @Override public String getAbsolutePath() {
-    return fileStatus.getPath().toString();
-  }
-
-  @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
-    CarbonFile[] files = listFiles();
-    if (files != null && files.length >= 1) {
-
-      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
-      for (int i = 0; i < files.length; i++) {
-        if (fileFilter.accept(files[i])) {
-          fileList.add(files[i]);
-        }
-      }
-
-      if (fileList.size() >= 1) {
-        return fileList.toArray(new CarbonFile[fileList.size()]);
-      } else {
-        return new CarbonFile[0];
-      }
-    }
-    return files;
-  }
-
-  @Override public String getName() {
-    return fileStatus.getPath().getName();
-  }
-
-  @Override public boolean isDirectory() {
-    return fileStatus.isDirectory();
-  }
-
-  @Override public boolean exists() {
-    try {
-      if (null != fileStatus) {
-        fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-        return fs.exists(fileStatus.getPath());
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-    }
-    return false;
-  }
-
-  @Override public String getCanonicalPath() {
-    return getAbsolutePath();
-  }
-
-  @Override public CarbonFile getParentFile() {
-    return new HDFSCarbonFile(fileStatus.getPath().getParent());
-  }
-
-  @Override public String getPath() {
-    return getAbsolutePath();
-  }
-
-  @Override public long getSize() {
-    return fileStatus.getLen();
-  }
-
-  public boolean renameTo(String changetoName) {
-    FileSystem fs;
-    try {
-      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-      return fs.rename(fileStatus.getPath(), new Path(changetoName));
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return false;
-    }
-  }
-
-  public boolean delete() {
-    FileSystem fs;
-    try {
-      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-      return fs.delete(fileStatus.getPath(), true);
-    } catch (IOException e) {
-      LOGGER.error("Exception occured" + e.getMessage());
-      return false;
-    }
-  }
-
-  @Override public CarbonFile[] listFiles() {
-
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return null;
-      }
-    } catch (IOException ex) {
-      LOGGER.error("Exception occured: " + ex.getMessage());
-      return new CarbonFile[0];
-    }
-
-    return getFiles(listStatus);
+    super(fileStatus);
   }
 
   /**
@@ -187,98 +59,56 @@ public class HDFSCarbonFile implements CarbonFile {
     if (listStatus == null) {
       return new CarbonFile[0];
     }
-
     CarbonFile[] files = new CarbonFile[listStatus.length];
-
     for (int i = 0; i < files.length; i++) {
       files[i] = new HDFSCarbonFile(listStatus[i]);
     }
     return files;
   }
 
-  @Override public long getLastModifiedTime() {
-    return fileStatus.getModificationTime();
-  }
-
-  @Override public boolean setLastModifiedTime(long timestamp) {
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
     try {
-      fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
     } catch (IOException e) {
-      return false;
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return new CarbonFile[0];
     }
-    return true;
+    return getFiles(listStatus);
   }
 
-  /**
-   * This method will delete the data in file data from a given offset
-   */
-  @Override public boolean truncate(String fileName, long validDataEndOffset) {
-    DataOutputStream dataOutputStream = null;
-    DataInputStream dataInputStream = null;
-    boolean fileTruncatedSuccessfully = false;
-    // if bytes to read less than 1024 then buffer size should be equal to the given offset
-    int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
-        CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
-        (int) validDataEndOffset;
-    // temporary file name
-    String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
-    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    try {
-      CarbonFile tempFile = null;
-      // delete temporary file if it already exists at a given path
-      if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
-        tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-        tempFile.delete();
-      }
-      // create new temporary file
-      FileFactory.createNewFile(tempWriteFilePath, fileType);
-      tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-      byte[] buff = new byte[bufferSize];
-      dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
-      // read the data
-      int read = dataInputStream.read(buff, 0, buff.length);
-      dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
-      dataOutputStream.write(buff, 0, read);
-      long remaining = validDataEndOffset - read;
-      // anytime we should not cross the offset to be read
-      while (remaining > 0) {
-        if (remaining > bufferSize) {
-          buff = new byte[bufferSize];
-        } else {
-          buff = new byte[(int) remaining];
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
         }
-        read = dataInputStream.read(buff, 0, buff.length);
-        dataOutputStream.write(buff, 0, read);
-        remaining = remaining - read;
       }
-      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
-      // rename the temp file to original file
-      tempFile.renameForce(fileName);
-      fileTruncatedSuccessfully = true;
-    } catch (IOException e) {
-      LOGGER.error("Exception occured while truncating the file " + e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
     }
-    return fileTruncatedSuccessfully;
+    return files;
   }
 
-  /**
-   * This method will be used to check whether a file has been modified or not
-   *
-   * @param fileTimeStamp time to be compared with latest timestamp of file
-   * @param endOffset     file length to be compared with current length of file
-   * @return
-   */
-  @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
-    boolean isFileModified = false;
-    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
-      isFileModified = true;
-    }
-    return isFileModified;
+  @Override
+  public CarbonFile getParentFile() {
+    return new HDFSCarbonFile(fileStatus.getPath().getParent());
   }
 
-  @Override public boolean renameForce(String changetoName) {
+  @Override
+  public boolean renameForce(String changetoName) {
     FileSystem fs;
     try {
       fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
@@ -294,5 +124,4 @@ public class HDFSCarbonFile implements CarbonFile {
       return false;
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
new file mode 100644
index 0000000..c7e4497
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.datastorage.store.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+
+public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
+
+  public ViewFSCarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public ViewFSCarbonFile(Path path) {
+    super(path);
+  }
+
+  public ViewFSCarbonFile(FileStatus fileStatus) {
+    super(fileStatus);
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new ViewFSCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
+    } catch (IOException ex) {
+      LOGGER.error("Exception occured" + ex.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
+        }
+      }
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile getParentFile() {
+    return new ViewFSCarbonFile(fileStatus.getPath().getParent());
+  }
+
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      if (fs instanceof ViewFileSystem) {
+        fs.delete(new Path(changetoName), true);
+        fs.rename(fileStatus.getPath(), new Path(changetoName));
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured" + e.getMessage());
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
new file mode 100644
index 0000000..653c243
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.datastorage.store.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.FileHolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class DFSFileHolderImpl implements FileHolder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DFSFileHolderImpl.class.getName());
+  /**
+   * cache to hold filename and its stream
+   */
+  private Map<String, FSDataInputStream> fileNameAndStreamCache;
+
+  public DFSFileHolderImpl() {
+    this.fileNameAndStreamCache =
+        new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  @Override public byte[] readByteArray(String filePath, long offset, int length) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    byte[] byteBffer = read(fileChannel, length, offset);
+    return byteBffer;
+  }
+
+  /**
+   * This method will be used to check whether stream is already present in
+   * cache or not for filepath if not present then create it and then add to
+   * cache, other wise get from cache
+   *
+   * @param filePath fully qualified file path
+   * @return channel
+   */
+  private FSDataInputStream updateCache(String filePath) {
+    FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
+    try {
+      if (null == fileChannel) {
+        Path pt = new Path(filePath);
+        FileSystem fs = pt.getFileSystem(new Configuration());
+        fileChannel = fs.open(pt);
+        fileNameAndStreamCache.put(filePath, fileChannel);
+      }
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return fileChannel;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @param offset  position
+   * @return byte buffer
+   */
+  private byte[] read(FSDataInputStream channel, int size, long offset) {
+    byte[] byteBffer = new byte[size];
+    try {
+      channel.seek(offset);
+      channel.readFully(byteBffer);
+    } catch (Exception e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return byteBffer;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @return byte buffer
+   */
+  private byte[] read(FSDataInputStream channel, int size) {
+    byte[] byteBffer = new byte[size];
+    try {
+      channel.readFully(byteBffer);
+    } catch (Exception e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return byteBffer;
+  }
+
+  @Override public int readInt(String filePath, long offset) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    int i = -1;
+    try {
+      fileChannel.seek(offset);
+      i = fileChannel.readInt();
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+
+    return i;
+  }
+
+  @Override public long readDouble(String filePath, long offset) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    long i = -1;
+    try {
+      fileChannel.seek(offset);
+      i = fileChannel.readLong();
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+
+    return i;
+  }
+
+  @Override public void finish() {
+    for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
+      try {
+        FSDataInputStream channel = entry.getValue();
+        if (null != channel) {
+          channel.close();
+        }
+      } catch (IOException exception) {
+        LOGGER.error(exception, exception.getMessage());
+      }
+    }
+
+  }
+
+  @Override public byte[] readByteArray(String filePath, int length) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    byte[] byteBffer = read(fileChannel, length);
+    return byteBffer;
+  }
+
+  @Override public long readLong(String filePath, long offset) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    long i = -1;
+    try {
+      fileChannel.seek(offset);
+      i = fileChannel.readLong();
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return i;
+  }
+
+  @Override public int readInt(String filePath) {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    int i = -1;
+    try {
+      i = fileChannel.readInt();
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return i;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
index 11645fc..c88ade8 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -32,6 +32,7 @@ import org.carbondata.core.datastorage.store.FileHolder;
 import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
 import org.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
 import org.carbondata.core.util.CarbonUtil;
 
 import org.apache.commons.io.FileUtils;
@@ -49,8 +50,10 @@ public final class FileFactory {
   static {
     String property = CarbonUtil.getCarbonStorePath(null, null);
     if (property != null) {
-      if (property.startsWith("hdfs://")) {
+      if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
         storeDefaultFileType = FileType.HDFS;
+      } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+        storeDefaultFileType = FileType.VIEWFS;
       }
     }
 
@@ -71,7 +74,8 @@ public final class FileFactory {
       case LOCAL:
         return new FileHolderImpl();
       case HDFS:
-        return new HDFSFileHolderImpl();
+      case VIEWFS:
+        return new DFSFileHolderImpl();
       default:
         return new FileHolderImpl();
     }
@@ -80,16 +84,20 @@ public final class FileFactory {
   public static FileType getFileType() {
     String property = CarbonUtil.getCarbonStorePath(null, null);
     if (property != null) {
-      if (property.startsWith("hdfs://")) {
+      if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
         storeDefaultFileType = FileType.HDFS;
+      } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+        storeDefaultFileType = FileType.VIEWFS;
       }
     }
     return storeDefaultFileType;
   }
 
   public static FileType getFileType(String path) {
-    if (path.startsWith("hdfs://")) {
+    if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
       return FileType.HDFS;
+    } else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+      return FileType.VIEWFS;
     }
     return FileType.LOCAL;
   }
@@ -100,6 +108,8 @@ public final class FileFactory {
         return new LocalCarbonFile(path);
       case HDFS:
         return new HDFSCarbonFile(path);
+      case VIEWFS:
+        return new ViewFSCarbonFile(path);
       default:
         return new LocalCarbonFile(path);
     }
@@ -112,6 +122,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataInputStream stream = fs.open(pt);
@@ -128,6 +139,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -152,6 +164,7 @@ public final class FileFactory {
     path = path.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -176,6 +189,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = fs.create(pt, true);
@@ -192,6 +206,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = fs.create(pt, replicationFactor);
@@ -209,6 +224,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path), bufferSize));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = fs.create(pt, true, bufferSize);
@@ -227,6 +243,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = null;
@@ -256,6 +273,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path), bufferSize));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream =
@@ -280,6 +298,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         if (performFileCheck) {
@@ -311,6 +330,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.exists(path);
@@ -326,6 +346,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.createNewFile(path);
@@ -341,6 +362,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.mkdirs(path);
@@ -366,6 +388,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
       case HDFS:
+      case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
         FSDataOutputStream stream = fs.append(pt);
@@ -388,6 +411,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         if (fs.createNewFile(path)) {
@@ -403,7 +427,7 @@ public final class FileFactory {
   }
 
   public enum FileType {
-    LOCAL, HDFS
+    LOCAL, HDFS, VIEWFS
   }
 
   /**
@@ -418,6 +442,7 @@ public final class FileFactory {
     FileType fileType = getFileType(filePath);
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         return filePath;
       case LOCAL:
       default:
@@ -438,6 +463,7 @@ public final class FileFactory {
     FileType fileType = getFileType(filePath);
     switch (fileType) {
       case HDFS:
+      case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
         return fs.getContentSummary(path).getLength();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
deleted file mode 100644
index 6e753e6..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.FileHolder;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class HDFSFileHolderImpl implements FileHolder {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(HDFSFileHolderImpl.class.getName());
-  /**
-   * cache to hold filename and its stream
-   */
-  private Map<String, FSDataInputStream> fileNameAndStreamCache;
-
-  public HDFSFileHolderImpl() {
-    this.fileNameAndStreamCache =
-        new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  @Override public byte[] readByteArray(String filePath, long offset, int length) {
-
-    FSDataInputStream fileChannel = updateCache(filePath);
-    byte[] byteBffer = read(fileChannel, length, offset);
-    return byteBffer;
-  }
-
-  /**
-   * This method will be used to check whether stream is already present in
-   * cache or not for filepath if not present then create it and then add to
-   * cache, other wise get from cache
-   *
-   * @param filePath fully qualified file path
-   * @return channel
-   */
-  private FSDataInputStream updateCache(String filePath) {
-    FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
-    try {
-      if (null == fileChannel) {
-        Path pt = new Path(filePath);
-        FileSystem fs = pt.getFileSystem(new Configuration());
-        fileChannel = fs.open(pt);
-        fileNameAndStreamCache.put(filePath, fileChannel);
-      }
-    } catch (IOException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return fileChannel;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @param offset  position
-   * @return byte buffer
-   */
-  private byte[] read(FSDataInputStream channel, int size, long offset) {
-    byte[] byteBffer = new byte[size];
-    try {
-      channel.seek(offset);
-      channel.readFully(byteBffer);
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return byteBffer;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @return byte buffer
-   */
-  private byte[] read(FSDataInputStream channel, int size) {
-    byte[] byteBffer = new byte[size];
-    try {
-      channel.readFully(byteBffer);
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return byteBffer;
-  }
-
-  @Override public int readInt(String filePath, long offset) {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    int i = -1;
-    try {
-      fileChannel.seek(offset);
-      i = fileChannel.readInt();
-    } catch (IOException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-
-    return i;
-  }
-
-  @Override public long readDouble(String filePath, long offset) {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    long i = -1;
-    try {
-      fileChannel.seek(offset);
-      i = fileChannel.readLong();
-    } catch (IOException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-
-    return i;
-  }
-
-  @Override public void finish() {
-    for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
-      try {
-        FSDataInputStream channel = entry.getValue();
-        if (null != channel) {
-          channel.close();
-        }
-      } catch (IOException exception) {
-        LOGGER.error(exception, exception.getMessage());
-      }
-    }
-
-  }
-
-  @Override public byte[] readByteArray(String filePath, int length) {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    byte[] byteBffer = read(fileChannel, length);
-    return byteBffer;
-  }
-
-  @Override public long readLong(String filePath, long offset) {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    long i = -1;
-    try {
-      fileChannel.seek(offset);
-      i = fileChannel.readLong();
-    } catch (IOException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return i;
-  }
-
-  @Override public int readInt(String filePath) {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    int i = -1;
-    try {
-      i = fileChannel.readInt();
-    } catch (IOException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return i;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
index 2772362..2910105 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
@@ -81,8 +81,8 @@ import org.pentaho.di.core.exception.KettleException;
 
 public final class CarbonUtil {
 
-  private static final String HDFS_PREFIX = "hdfs://";
-
+  public static final String HDFS_PREFIX = "hdfs://";
+  public static final String VIEWFS_PREFIX = "viewfs://";
   private static final String FS_DEFAULT_FS = "fs.defaultFS";
 
   /**
@@ -1213,21 +1213,22 @@ public final class CarbonUtil {
   public static String checkAndAppendHDFSUrl(String filePath) {
     String currentPath = filePath;
     if (null != filePath && filePath.length() != 0 &&
-        FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS) {
-      String baseHDFSUrl = CarbonProperties.getInstance()
+        FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS &&
+        FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
+      String baseDFSUrl = CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL);
-      if (null != baseHDFSUrl) {
-        String hdfsUrl = conf.get(FS_DEFAULT_FS);
-        if (hdfsUrl.startsWith(HDFS_PREFIX)) {
-          baseHDFSUrl = hdfsUrl + baseHDFSUrl;
+      if (null != baseDFSUrl) {
+        String dfsUrl = conf.get(FS_DEFAULT_FS);
+        if (dfsUrl.startsWith(HDFS_PREFIX) || dfsUrl.startsWith(VIEWFS_PREFIX)) {
+          baseDFSUrl = dfsUrl + baseDFSUrl;
         }
-        if (baseHDFSUrl.endsWith("/")) {
-          baseHDFSUrl = baseHDFSUrl.substring(0, baseHDFSUrl.length() - 1);
+        if (baseDFSUrl.endsWith("/")) {
+          baseDFSUrl = baseDFSUrl.substring(0, baseDFSUrl.length() - 1);
         }
         if (!filePath.startsWith("/")) {
           filePath = "/" + filePath;
         }
-        currentPath = baseHDFSUrl + filePath;
+        currentPath = baseDFSUrl + filePath;
       }
     }
     return currentPath;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
index 7f9e56d..31b0cfc 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
@@ -22,7 +22,8 @@ public class SchemaReader {
   public CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
       CarbonTableIdentifier tableIdentifier, String storePath) throws IOException {
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
-    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)) {
+    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)
+          || FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
       String tableName = tableIdentifier.getTableName();
 
       ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
index 575fd6a..d832504 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
@@ -177,7 +177,8 @@ public class BlockDataHandler {
       }
 
       // Open the next one...
-      if (FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.HDFS) {
+      if (FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.HDFS
+            || FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.VIEWFS) {
         //when case HDFS file type, we use the file path directly
         //give 0 offset as the file start offset when open a new file
         initializeFileReader(blockDetails.getFilePath(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index 4c361fc..a64a256 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -31,10 +31,7 @@ import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.carbondata.core.carbon.path.CarbonStorePath;
 import org.carbondata.core.carbon.path.CarbonTablePath;
 import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.*;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.carbondata.core.load.LoadMetadataDetails;
@@ -125,12 +122,7 @@ public final class CarbonDataProcessorUtil {
     } catch (IOException e1) {
       LOGGER.info("bad record folder does not exist");
     }
-    CarbonFile carbonFile = null;
-    if (fileType.equals(FileFactory.FileType.HDFS)) {
-      carbonFile = new HDFSCarbonFile(badLogStoreLocation);
-    } else {
-      carbonFile = new LocalCarbonFile(badLogStoreLocation);
-    }
+    CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType);
 
     CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile pathname) {


Mime
View raw message