carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3404] Support CarbonFile API through FileTypeInterface to use custom FileSystem
Date Thu, 06 Jun 2019 02:39:44 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f1b9f  [CARBONDATA-3404] Support CarbonFile API through FileTypeInterface to use
custom FileSystem
85f1b9f is described below

commit 85f1b9ff4d459248af56002e1523bcb46bf366e4
Author: KanakaKumar <kanaka.avvau@huawei.com>
AuthorDate: Wed May 29 12:39:06 2019 +0530

    [CARBONDATA-3404] Support CarbonFile API through FileTypeInterface to use custom FileSystem
    
    Currently CarbonData supports few set of FileSystems like HDFS,S3,VIEWFS schemes.
    If user configures table path from different file systems apart from supported, FileFactory
takes CarbonLocalFile as default and causes errors.
    
    This PR proposes to support a API for user to extend CarbonFile which override the required
methods from AbstractCarbonFile if a specific handling required for operations like renameForce.
    
    This closes #3246
---
 .../core/constants/CarbonCommonConstants.java      |  5 ++
 .../filesystem/AbstractDFSCarbonFile.java          |  6 +-
 .../core/datastore/filesystem/CarbonFile.java      |  4 +-
 .../core/datastore/filesystem/LocalCarbonFile.java | 10 +--
 .../datastore/impl/DefaultFileTypeProvider.java    | 84 +++++++++++++++-------
 .../core/datastore/impl/FileFactory.java           | 82 +++++++++++++++++----
 .../core/datastore/impl/FileTypeInterface.java     | 23 ++++--
 .../carbondata/core/locks/CarbonLockFactory.java   | 11 ++-
 .../core/metadata/schema/SchemaReader.java         |  5 +-
 .../apache/carbondata/core/util/CarbonUtil.java    | 19 ++---
 .../store/impl/FileFactoryImplUnitTest.java        | 55 ++++++++++++--
 .../filesystem/store/impl/TestFileProvider.java    | 59 +++++++++++++++
 .../dblocation/DBLocationCarbonTableTestCase.scala |  4 +-
 13 files changed, 282 insertions(+), 85 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8b39343..1201e1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1601,6 +1601,11 @@ public final class CarbonCommonConstants {
   public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey";
 
   /**
+   * Configuration Key for custom file provider
+   */
+  public static final String CUSTOM_FILE_PROVIDER = "carbon.fs.custom.file.provider";
+
+  /**
    * FS_DEFAULT_FS
    */
   @CarbonProperty
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index a90648e..1470c05 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -404,8 +404,8 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return new DataOutputStream(new BufferedOutputStream(outputStream));
   }
 
-  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
-      boolean performFileCheck) throws IOException {
+  @Override public boolean isFileExist(String filePath, boolean performFileCheck)
+      throws IOException {
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
@@ -416,7 +416,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     }
   }
 
-  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
+  @Override public boolean isFileExist(String filePath)
       throws IOException {
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index be08338..c3c5be5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -139,10 +139,10 @@ public interface CarbonFile {
   DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
       String compressor) throws IOException;
 
-  boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean performFileCheck)
+  boolean isFileExist(String filePath, boolean performFileCheck)
       throws IOException;
 
-  boolean isFileExist(String filePath, FileFactory.FileType fileType) throws IOException;
+  boolean isFileExist(String filePath) throws IOException;
 
   boolean createNewFile(String filePath, FileFactory.FileType fileType) throws IOException;
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 6f55586..3667069 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -410,10 +410,10 @@ public class LocalCarbonFile implements CarbonFile {
     }
   }
 
-  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
-      boolean performFileCheck) throws IOException {
+  @Override public boolean isFileExist(String filePath, boolean performFileCheck)
+      throws IOException {
     filePath = filePath.replace("\\", "/");
-    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    filePath = FileFactory.getUpdatedFilePath(filePath);
     File defaultFile = new File(filePath);
 
     if (performFileCheck) {
@@ -423,10 +423,10 @@ public class LocalCarbonFile implements CarbonFile {
     }
   }
 
-  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
+  @Override public boolean isFileExist(String filePath)
       throws IOException {
     filePath = filePath.replace("\\", "/");
-    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    filePath = FileFactory.getUpdatedFilePath(filePath);
     File defaultFile = new File(filePath);
     return defaultFile.exists();
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index 937b5b6..cdb1a20 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -17,50 +17,80 @@
 
 package org.apache.carbondata.core.datastore.impl;
 
-import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
 
+/**
+ * FileType provider to create CarbonFile specific to the file system where the path belongs
to.
+ */
 public class DefaultFileTypeProvider implements FileTypeInterface {
 
-  public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration)
{
-    switch (fileType) {
-      case LOCAL:
-        return new FileReaderImpl();
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        return new DFSFileReaderImpl(configuration);
-      default:
-        return new FileReaderImpl();
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(DefaultFileTypeProvider.class.getName());
+
+  /**
+   * Custom file type provider for supporting non default file systems.
+   */
+  protected FileTypeInterface customFileTypeProvider = null;
+
+  protected boolean customFileTypeProviderInitialized = false;
+
+  public DefaultFileTypeProvider() {
+  }
+
+  /**
+   * This method is required apart from Constructor to handle the below circular dependency.
+   * CarbonProperties-->FileFactory-->DefaultTypeProvider-->CarbonProperties
+   */
+  private void initializeCustomFileprovider() {
+    if (!customFileTypeProviderInitialized) {
+      customFileTypeProviderInitialized = true;
+      String customFileProvider =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER);
+      if (customFileProvider != null && !customFileProvider.trim().isEmpty()) {
+        try {
+          customFileTypeProvider =
+              (FileTypeInterface) Class.forName(customFileProvider).newInstance();
+        } catch (Exception e) {
+          LOGGER.error("Unable load configured FileTypeInterface class. Ignored.", e);
+        }
+      }
     }
   }
 
-  public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) {
-    switch (fileType) {
-      case LOCAL:
-        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
-      case HDFS:
-        return new HDFSCarbonFile(path);
-      case S3:
-        return new S3CarbonFile(path);
-      case ALLUXIO:
-        return new AlluxioCarbonFile(path);
-      case VIEWFS:
-        return new ViewFSCarbonFile(path);
-      default:
-        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+  /**
+   * Delegate to the custom file provider to check if the path is supported or not.
+   * Note this function do not check the default supported file systems as  #getCarbonFile
expects
+   * this method output is from customFileTypeProvider.
+   *
+   * @param path path of the file
+   * @return true if supported by the custom
+   */
+  @Override public boolean isPathSupported(String path) {
+    initializeCustomFileprovider();
+    if (customFileTypeProvider != null) {
+      return customFileTypeProvider.isPathSupported(path);
     }
+    return false;
   }
 
-  public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration
conf) {
+  public CarbonFile getCarbonFile(String path, Configuration conf) {
+    // Handle the custom file type first
+    if (isPathSupported(path)) {
+      return customFileTypeProvider.getCarbonFile(path, conf);
+    }
+
+    FileFactory.FileType fileType = FileFactory.getFileType(path);
     switch (fileType) {
       case LOCAL:
         return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index a27023f..372c7c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.channels.FileChannel;
+import java.util.Locale;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -52,10 +53,12 @@ public final class FileFactory {
     configuration.addResource(new Path("../core-default.xml"));
   }
 
-  private static FileTypeInterface fileFileTypeInterface = new DefaultFileTypeProvider();
-  public static void setFileTypeInterface(FileTypeInterface fileTypeInterface) {
+  private static DefaultFileTypeProvider fileFileTypeInterface = new DefaultFileTypeProvider();
+
+  public static void setFileTypeInterface(DefaultFileTypeProvider fileTypeInterface) {
     fileFileTypeInterface = fileTypeInterface;
   }
+
   private FileFactory() {
 
   }
@@ -73,11 +76,22 @@ public final class FileFactory {
   }
 
   public static FileReader getFileHolder(FileType fileType) {
-    return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
+    return getFileHolder(fileType, getConfiguration());
   }
 
-  public static FileReader getFileHolder(FileType fileType, Configuration configuration)
{
-    return fileFileTypeInterface.getFileHolder(fileType, configuration);
+  public static FileReader getFileHolder(FileFactory.FileType fileType,
+      Configuration configuration) {
+    switch (fileType) {
+      case LOCAL:
+        return new FileReaderImpl();
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+      case S3:
+        return new DFSFileReaderImpl(configuration);
+      default:
+        return new FileReaderImpl();
+    }
   }
 
   public static FileType getFileType(String path) {
@@ -89,6 +103,17 @@ public final class FileFactory {
     if (fileType != null) {
       return fileType;
     }
+
+    // If custom file type is configured,
+    if (fileFileTypeInterface.isPathSupported(path)) {
+      return FileType.CUSTOM;
+    }
+
+    // If its unsupported file system, throw error instead of heading to wrong behavior,
+    if (path.contains("://") && !path.startsWith("file://")) {
+      throw new IllegalArgumentException("Path belongs to unsupported file system " + path);
+    }
+
     return FileType.LOCAL;
   }
 
@@ -124,14 +149,15 @@ public final class FileFactory {
   }
 
   public static CarbonFile getCarbonFile(String path) {
-    return fileFileTypeInterface.getCarbonFile(path, getFileType(path));
+    return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
   }
   public static CarbonFile getCarbonFile(String path, FileType fileType) {
-    return fileFileTypeInterface.getCarbonFile(path, fileType);
+    //TODO ignoring this fileType now to avoid refactoring. Remove the unused argument later.
+    return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
   }
   public static CarbonFile getCarbonFile(String path,
       Configuration hadoopConf) {
-    return fileFileTypeInterface.getCarbonFile(path, getFileType(path), hadoopConf);
+    return fileFileTypeInterface.getCarbonFile(path, hadoopConf);
   }
 
   public static DataInputStream getDataInputStream(String path, FileType fileType)
@@ -229,12 +255,11 @@ public final class FileFactory {
    * not if the performFileCheck is true
    *
    * @param filePath         - Path
-   * @param fileType         - FileType Local/HDFS
    * @param performFileCheck - Provide false for folders, true for files and
    */
-  public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
+  public static boolean isFileExist(String filePath, boolean performFileCheck)
       throws IOException {
-    return getCarbonFile(filePath).isFileExist(filePath, fileType, performFileCheck);
+    return getCarbonFile(filePath).isFileExist(filePath, performFileCheck);
   }
 
   /**
@@ -244,7 +269,7 @@ public final class FileFactory {
    * @param fileType - FileType Local/HDFS
    */
   public static boolean isFileExist(String filePath, FileType fileType) throws IOException
{
-    return getCarbonFile(filePath).isFileExist(filePath, fileType);
+    return getCarbonFile(filePath).isFileExist(filePath);
   }
 
   /**
@@ -347,6 +372,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case CUSTOM:
       case S3:
         // if hadoop version >= 2.7, it can call method 'FileSystem.truncate' to truncate
file,
         // this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
@@ -392,7 +418,7 @@ public final class FileFactory {
   }
 
   public enum FileType {
-    LOCAL, HDFS, ALLUXIO, VIEWFS, S3
+    LOCAL, HDFS, ALLUXIO, VIEWFS, S3, CUSTOM
   }
 
   /**
@@ -413,6 +439,7 @@ public final class FileFactory {
       case ALLUXIO:
       case VIEWFS:
       case S3:
+      case CUSTOM:
       default:
         return filePath;
     }
@@ -432,6 +459,7 @@ public final class FileFactory {
       case HDFS:
       case VIEWFS:
       case S3:
+      case CUSTOM:
         return filePath;
       case ALLUXIO:
         return StringUtils.startsWith(filePath, "alluxio") ? filePath : "alluxio:///" + filePath;
@@ -483,6 +511,7 @@ public final class FileFactory {
       case ALLUXIO:
       case VIEWFS:
       case S3:
+      case CUSTOM:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(getConfiguration());
         return fs.getContentSummary(path).getLength();
@@ -524,6 +553,7 @@ public final class FileFactory {
       case HDFS:
       case ALLUXIO:
       case VIEWFS:
+      case CUSTOM:
         try {
           Path path = new Path(directoryPath);
           FileSystem fs = path.getFileSystem(getConfiguration());
@@ -551,6 +581,10 @@ public final class FileFactory {
    * Check and append the hadoop's defaultFS to the path
    */
   public static String checkAndAppendDefaultFs(String path, Configuration conf) {
+    if (FileFactory.getFileType(path) == FileType.CUSTOM) {
+      // If its custom file type, already schema is present, no need to append schema.
+      return path;
+    }
     String defaultFs = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
     String lowerPath = path.toLowerCase();
     if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
@@ -568,6 +602,28 @@ public final class FileFactory {
   }
 
   /**
+   * Return true if schema is present or not in the file path
+   *
+   * @param path
+   * @return
+   */
+  public static boolean checkIfPrefixExists(String path) {
+    if (FileFactory.getFileType(path) == FileType.CUSTOM) {
+      // If its custom file type, already schema is present, no need to append schema.
+      return true;
+    }
+
+    final String lowerPath = path.toLowerCase(Locale.getDefault());
+    return lowerPath.contains("://") || lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
+        || lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.S3_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.S3A_PREFIX);
+  }
+
+  /**
    * set the file replication
    *
    * @param path file path
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 8b0fcc4..f9de81a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -17,15 +17,30 @@
 
 package org.apache.carbondata.core.datastore.impl;
 
-import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 
 import org.apache.hadoop.conf.Configuration;
 
+/**
+ * Interface to  create CarbonFile Instance specific to the FileSystem where the patch belongs.
+ */
 public interface FileTypeInterface {
 
-  FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
-  CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
-  CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
+  /**
+   * Return the correct CarbonFile instance.
+   *
+   * @param path          path of the file
+   * @param configuration configuration
+   * @return CarbonFile instance
+   */
+  public CarbonFile getCarbonFile(String path, Configuration configuration);
+
+  /**
+   * Check if the FileSystem mapped with the given path is supported or not.
+   *
+   * @param path path of the file
+   * @return true if supported, fasle if not supported
+   */
+  public boolean isPathSupported(String path);
 }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index acdad60..187fc71 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
 
@@ -64,19 +65,17 @@ public class CarbonLockFactory {
       absoluteLockPath =
           getLockpath(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
     }
+    FileFactory.FileType fileType = FileFactory.getFileType(absoluteLockPath);
     if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
       return new ZooKeeperLocking(absoluteLockPath, lockFile);
-    } else if (absoluteLockPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
-            absoluteLockPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
-            absoluteLockPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+    } else if (fileType == FileFactory.FileType.S3) {
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3;
       return new S3FileLock(absoluteLockPath,
                 lockFile);
-    } else if (absoluteLockPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
-            || absoluteLockPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+    } else if (fileType == FileFactory.FileType.HDFS || fileType == FileFactory.FileType.VIEWFS)
{
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS;
       return new HdfsFileLock(absoluteLockPath, lockFile);
-    } else if (absoluteLockPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+    } else if (fileType == FileFactory.FileType.ALLUXIO) {
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_ALLUXIO;
       return new AlluxioFileLock(absoluteLockPath, lockFile);
     } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index d0bc976..4cd4195 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -39,10 +39,7 @@ public class SchemaReader {
   public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier identifier)
       throws IOException {
     String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
-    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
+    if (FileFactory.isFileExist(schemaFilePath)) {
       String tableName = identifier.getCarbonTableIdentifier().getTableName();
 
       org.apache.carbondata.format.TableInfo tableInfo =
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index d8e4499..6fa24b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -631,7 +631,7 @@ public final class CarbonUtil {
     String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
     String baseDFSUrl = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
-    if (checkIfPrefixExists(filePath)) {
+    if (FileFactory.checkIfPrefixExists(filePath)) {
       return currentPath;
     }
     if (baseDFSUrl.endsWith("/")) {
@@ -641,7 +641,7 @@ public final class CarbonUtil {
       filePath = "/" + filePath;
     }
     currentPath = baseDFSUrl + filePath;
-    if (checkIfPrefixExists(currentPath)) {
+    if (FileFactory.checkIfPrefixExists(currentPath)) {
       return currentPath;
     }
     if (defaultFsUrl == null) {
@@ -658,7 +658,7 @@ public final class CarbonUtil {
   public static String checkAndAppendFileSystemURIScheme(String filePath) {
     String currentPath = filePath;
 
-    if (checkIfPrefixExists(filePath)) {
+    if (FileFactory.checkIfPrefixExists(filePath)) {
       return currentPath;
     }
     if (!filePath.startsWith("/")) {
@@ -686,18 +686,6 @@ public final class CarbonUtil {
       return "";
     }
   }
-
-  private static boolean checkIfPrefixExists(String path) {
-    final String lowerPath = path.toLowerCase(Locale.getDefault());
-    return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX);
-  }
-
   public static String removeAKSK(String filePath) {
     if (null == filePath) {
       return "";
@@ -2566,6 +2554,7 @@ public final class CarbonUtil {
       case ALLUXIO:
       case VIEWFS:
       case S3:
+      case CUSTOM:
         Path path = new Path(segmentPath);
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
         if (fs.exists(path)) {
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
index 0e7d1c9..804dc94 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
@@ -22,18 +22,24 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastore.impl.DefaultFileTypeProvider;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import mockit.Mock;
 import mockit.MockUp;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertNotNull;
+import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -72,19 +78,19 @@ public class FileFactoryImplUnitTest {
   }
 
   @Test public void testFileExistsForDefaultTypeWithPerformFileCheck() throws IOException
{
-    assertTrue(FileFactory.isFileExist(filePath, FileFactory.FileType.LOCAL, true));
+    assertTrue(FileFactory.isFileExist(filePath, true));
   }
 
   @Test public void testFileExistsForDefaultTypeWithOutPerformFileCheck() throws IOException
{
-    assertFalse(FileFactory.isFileExist("fakefilePath", FileFactory.FileType.LOCAL, false));
+    assertFalse(FileFactory.isFileExist("fakefilePath", false));
   }
 
   @Test public void testFileExistsForVIEWFSTypeWithPerformFileCheck() throws IOException
{
-    assertTrue(FileFactory.isFileExist(filePath, FileFactory.FileType.VIEWFS, true));
+    assertTrue(FileFactory.isFileExist(filePath, true));
   }
 
   @Test public void testFileExistsForVIEWFSTypeWithOutPerformFileCheck() throws IOException
{
-    assertFalse(FileFactory.isFileExist("fakefilePath", FileFactory.FileType.VIEWFS, false));
+    assertFalse(FileFactory.isFileExist("fakefilePath", false));
   }
 
   @Test public void testCreateNewFileWithDefaultFileType() throws IOException {
@@ -205,5 +211,46 @@ public class FileFactoryImplUnitTest {
       }
     }
   }
+
+  @Test public void testCustomFS() throws IOException {
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER,
+        "org.apache.carbondata.DummyFileProvider");
+
+    FileFactory.setFileTypeInterface(new DefaultFileTypeProvider());
+
+    String path = "testfs://dir1/file1";
+    try {
+      FileFactory.getFileType(path);
+      fail("Expected validation error");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Path belongs to unsupported file system"));
+    }
+
+    try {
+      FileFactory.getCarbonFile(path);
+      fail("Expected validation error");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Path belongs to unsupported file system"));
+    }
+
+    // Update the conf to   TestFileProvider
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER, TestFileProvider.class.getName());
+    FileFactory.setFileTypeInterface(new DefaultFileTypeProvider());
+
+    try {
+      Assert.assertSame(FileFactory.FileType.CUSTOM, FileFactory.getFileType(path));
+      assertTrue(FileFactory.getCarbonFile(path) instanceof LocalCarbonFile);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Unexpected error " + e);
+    }
+
+    //Reset the default configuration
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER,
"");
+    FileFactory.setFileTypeInterface(new DefaultFileTypeProvider());
+  }
+
 }
 
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/TestFileProvider.java
b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/TestFileProvider.java
new file mode 100644
index 0000000..12b9996
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/TestFileProvider.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.core.carbon.datastorage.filesystem.store.impl;
+
+import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileTypeInterface;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestFileProvider implements FileTypeInterface {
+
+  @Override public CarbonFile getCarbonFile(String path, Configuration configuration) {
+
+    if (path.startsWith("testfs://")) {
+      //Just sample for translation. Make it as local file path
+      path = path.replace("testfs://", "/");
+    }
+    FileFactory.FileType fileType = FileFactory.getFileType(path);
+    switch (fileType) {
+      case LOCAL:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+      case HDFS:
+        return new HDFSCarbonFile(path, configuration);
+      case S3:
+        return new S3CarbonFile(path, configuration);
+      case ALLUXIO:
+        return new AlluxioCarbonFile(path);
+      case VIEWFS:
+        return new ViewFSCarbonFile(path);
+      default:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+    }
+  }
+
+  @Override public boolean isPathSupported(String path) {
+    return path.startsWith("testfs://");
+  }
+}
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
index 50fb8c5..37ad08c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
@@ -271,7 +271,7 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterEach
{
     sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED
BY 'org.apache.carbondata.format'""")
     sql("drop table carbontable")
     // perform file check
-    assert(FileFactory.isFileExist(timestampFile, timestampFileType, true) ||
+    assert(FileFactory.isFileExist(timestampFile, true) ||
            CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
 
     CarbonProperties.getInstance()
@@ -282,7 +282,7 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterEach
{
     sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED
BY 'org.apache.carbondata.format'""")
     sql("drop table carbontable")
     // perform file check
-    assert(FileFactory.isFileExist(timestampFile, timestampFileType, true) ||
+    assert(FileFactory.isFileExist(timestampFile, true) ||
            CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
   }
 


Mime
View raw message