Repository: drill
Updated Branches:
refs/heads/master 8a4d7a994 -> 2af709f43
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
new file mode 100644
index 0000000..a125bae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Contains list of parameters that will be used to store path / files on file system. */
+public class StorageStrategy {
+
+ /**
+ * Primary is used for persistent tables.
+ * For directories: drwxrwxr-x (owner and group have full access, others can read and execute).
+ * For files: -rw-r--r-- (owner can read and write, group and others can read).
+ * Folders and files are not deleted on file system close.
+ */
+ public static final StorageStrategy PERSISTENT = new StorageStrategy("775", "644", false);
+
+ /**
+ * Primary is used for temporary tables.
+ * For directories: drwx------ (owner has full access, group and others have no access).
+ * For files: -rw------- (owner can read and write, group and others have no access).
+ * Folders and files are deleted on file system close.
+ */
+ public static final StorageStrategy TEMPORARY = new StorageStrategy("700", "600", true);
+
+ private final String folderPermission;
+ private final String filePermission;
+ private final boolean deleteOnExit;
+
+ @JsonCreator
+ public StorageStrategy(@JsonProperty("folderPermission") String folderPermission,
+ @JsonProperty("filePermission") String filePermission,
+ @JsonProperty("deleteOnExit") boolean deleteOnExit) {
+ this.folderPermission = folderPermission;
+ this.filePermission = filePermission;
+ this.deleteOnExit = deleteOnExit;
+ }
+
+ public String getFolderPermission() {
+ return folderPermission;
+ }
+
+ public String getFilePermission() { return filePermission; }
+
+ public boolean isDeleteOnExit() {
+ return deleteOnExit;
+ }
+
+ /**
+ * Creates passed path on appropriate file system.
+ * Before creation checks which parent directories do not exists.
+ * Applies storage strategy rules to all newly created directories.
+ * Will return first created path or null already existed.
+ *
+ * Case 1: /a/b -> already exists, attempt to create /a/b/c/d
+ * Will create path and return /a/b/c.
+ * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/d
+ * Will create path and return /a/b/c/d.
+ * Case 3: /a/b/c/d -> already exists, will return null.
+ *
+ * @param fs file system where file should be located
+ * @param path location path
+ * @return first created parent path or file
+ * @throws IOException is thrown in case of problems while creating path, setting permission
+ * or adding path to delete on exit list
+ */
+ public Path createPathAndApply(FileSystem fs, Path path) throws IOException {
+ List<Path> locations = getNonExistentLocations(fs, path);
+ if (locations.isEmpty()) {
+ return null;
+ }
+ fs.mkdirs(path);
+ for (Path location : locations) {
+ applyStrategy(fs, location, folderPermission, deleteOnExit);
+ }
+ return locations.get(locations.size() - 1);
+ }
+
+ /**
+ * Creates passed file on appropriate file system.
+ * Before creation checks which parent directories do not exists.
+ * Applies storage strategy rules to all newly created directories and file.
+ * Will return first created parent path or file if no new parent paths created.
+ *
+ * Case 1: /a/b -> already exists, attempt to create /a/b/c/some_file.txt
+ * Will create file and return /a/b/c.
+ * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/some_file.txt
+ * Will create file and return /a/b/c/some_file.txt.
+ * Case 3: /a/b/c/some_file.txt -> already exists, will fail.
+ *
+ * @param fs file system where file should be located
+ * @param file file path
+ * @return first created parent path or file
+ * @throws IOException is thrown in case of problems while creating path, setting permission
+ * or adding path to delete on exit list
+ */
+ public Path createFileAndApply(FileSystem fs, Path file) throws IOException {
+ List<Path> locations = getNonExistentLocations(fs, file.getParent());
+ if (!fs.createNewFile(file)) {
+ throw new IOException(String.format("File [%s] already exists on file system [%s].",
+ file.toUri().getPath(), fs.getUri()));
+ }
+ applyToFile(fs, file);
+
+ if (locations.isEmpty()) {
+ return file;
+ }
+
+ for (Path location : locations) {
+ applyStrategy(fs, location, folderPermission, deleteOnExit);
+ }
+ return locations.get(locations.size() - 1);
+ }
+
+ /**
+ * Applies storage strategy to file:
+ * sets permission and adds to file system delete on exit list if needed.
+ *
+ * @param fs file system
+ * @param file path to file
+ * @throws IOException is thrown in case of problems while setting permission
+ * or adding file to delete on exit list
+ */
+ public void applyToFile(FileSystem fs, Path file) throws IOException {
+ applyStrategy(fs, file, filePermission, deleteOnExit);
+ }
+
+ /**
+ * Returns list of parent locations that do not exist, including initial location.
+ * First in the list will be initial location,
+ * last in the list will be last parent location that does not exist.
+ * If all locations exist, empty list will be returned.
+ *
+ * Case 1: if /a/b exists and passed location is /a/b/c/d,
+ * will return list with two elements: 0 -> /a/b/c/d, 1 -> /a/b/c
+ * Case 2: if /a/b exists and passed location is /a/b, will return empty list.
+ *
+ * @param fs file system where locations should be located
+ * @param path location path
+ * @return list of locations that do not exist
+ * @throws IOException in case of troubles accessing file system
+ */
+ private List<Path> getNonExistentLocations(FileSystem fs, Path path) throws IOException {
+ List<Path> locations = Lists.newArrayList();
+ Path starting = path;
+ while (starting != null && !fs.exists(starting)) {
+ locations.add(starting);
+ starting = starting.getParent();
+ }
+ return locations;
+ }
+
+ /**
+ * Applies storage strategy to passed path on passed file system.
+ * Sets appropriate permission
+ * and adds to file system delete on exit list if needed.
+ *
+ * @param fs file system where path is located
+ * @param path path location
+ * @param permission permission to be applied
+ * @param deleteOnExit if to delete path on exit
+ * @throws IOException is thrown in case of problems while setting permission
+ * or adding path to delete on exit list
+ */
+ private void applyStrategy(FileSystem fs, Path path, String permission, boolean deleteOnExit) throws IOException {
+ fs.setPermission(path, new FsPermission(permission));
+ if (deleteOnExit) {
+ fs.deleteOnExit(path);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index e502e99..2110f38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -63,8 +63,8 @@ public class SubSchemaWrapper extends AbstractSchema {
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
- return innerSchema.createNewTable(tableName, partitionColumns);
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+ return innerSchema.createNewTable(tableName, partitionColumns, storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index e0f5438..3a89591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -288,6 +288,9 @@ public class FileSelection {
}
final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
+ if (fileSel == null) {
+ return null;
+ }
fileSel.setHadWildcard(hasWildcard);
return fileSel;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 526dfb1..e3e01c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,12 +24,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -150,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
- return defaultSchema.createNewTable(tableName, partitionColumns);
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+ return defaultSchema.createNewTable(tableName, partitionColumns, storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index dac313b..8416ed8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -53,13 +53,13 @@ import org.apache.drill.exec.dotdrill.DotDrillFile;
import org.apache.drill.exec.dotdrill.DotDrillType;
import org.apache.drill.exec.dotdrill.DotDrillUtil;
import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DrillViewTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -522,7 +522,6 @@ public class WorkspaceSchemaFactory {
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
}
-
return tables.get(tableKey);
}
@@ -540,7 +539,7 @@ public class WorkspaceSchemaFactory {
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitonColumns) {
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
if (formatPlugin == null) {
@@ -553,7 +552,8 @@ public class WorkspaceSchemaFactory {
(FileSystemConfig) plugin.getConfig(),
formatPlugin,
config.getLocation() + Path.SEPARATOR + tableName,
- partitonColumns);
+ partitionColumns,
+ storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index db22568..52ce8b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.StoragePluginRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -48,6 +48,7 @@ public class EasyWriter extends AbstractWriter {
@JsonProperty("child") PhysicalOperator child,
@JsonProperty("location") String location,
@JsonProperty("partitionColumns") List<String> partitionColumns,
+ @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JsonProperty("format") FormatPluginConfig formatConfig,
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -57,6 +58,7 @@ public class EasyWriter extends AbstractWriter {
Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.location = location;
this.partitionColumns = partitionColumns;
+ setStorageStrategy(storageStrategy);
}
public EasyWriter(PhysicalOperator child,
@@ -92,7 +94,9 @@ public class EasyWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new EasyWriter(child, location, partitionColumns, formatPlugin);
+ EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin);
+ writer.setStorageStrategy(getStorageStrategy());
+ return writer;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 30c248e..58ca95f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -83,7 +83,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
- RecordWriter recordWriter = new JsonRecordWriter();
+ RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
recordWriter.init(options);
return recordWriter;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index f27e04c..c37da8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
@@ -46,6 +46,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
private static final String LINE_FEED = String.format("%n");
+ private Path cleanUpLocation;
private String location;
private String prefix;
@@ -58,11 +59,13 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private FSDataOutputStream stream = null;
private final JsonFactory factory = new JsonFactory();
+ private final StorageStrategy storageStrategy;
// Record write status
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
- public JsonRecordWriter(){
+ public JsonRecordWriter(StorageStrategy storageStrategy){
+ this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
}
@Override
@@ -81,7 +84,17 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
+ // json writer does not support partitions, so only one file can be created
+ // and thus only one location should be deleted in case of abort
+ // to ensure that our writer was the first to create output file,
+ // we create empty output file first and fail if file exists
+ cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+ // since empty output file will be overwritten (some file systems may restrict append option)
+ // we need to re-apply file permission
stream = fs.create(fileName);
+ storageStrategy.applyToFile(fs, fileName);
+
JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter();
if (uglify) {
generator = generator.setPrettyPrinter(new MinimalPrettyPrinter(LINE_FEED));
@@ -238,6 +251,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
@Override
public void abort() throws IOException {
+ if (cleanUpLocation != null) {
+ fs.delete(cleanUpLocation, true);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ cleanUpLocation.toUri().getPath(), fs.getUri());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 6542ad4..a9a30e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -125,7 +125,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
- RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator());
+ RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
recordWriter.init(options);
return recordWriter;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4ee863a..a25699d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -80,6 +81,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public static final String DRILL_VERSION_PROPERTY = "drill.version";
public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
+ private final StorageStrategy storageStrategy;
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
private Map<String, String> extraMetaData = new HashMap<>();
@@ -101,7 +103,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private BatchSchema batchSchema;
private Configuration conf;
+ private FileSystem fs;
private String location;
+ private List<Path> cleanUpLocations;
private String prefix;
private int index = 0;
private OperatorContext oContext;
@@ -117,6 +121,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
+ this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.PERSISTENT : writer.getStorageStrategy();
+ this.cleanUpLocations = Lists.newArrayList();
}
@Override
@@ -126,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ fs = FileSystem.get(conf);
blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
@@ -363,7 +370,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
// we wait until there is at least one record before creating the parquet file
if (parquetFileWriter == null) {
Path path = new Path(location, prefix + "_" + index + ".parquet");
- parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+ // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
+ Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+ // since parquet reader supports partitions, it means that several output files may be created
+ // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
+ // if table location was created before, we store only files created by this writer and delete them in case of abort
+ addCleanUpLocation(fs, firstCreatedPath);
+
+ // since ParquetFileWriter will overwrite empty output file (append is not supported)
+ // we need to re-apply file permission
+ parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+ storageStrategy.applyToFile(fs, path);
+
parquetFileWriter.start();
}
@@ -374,6 +393,24 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void abort() throws IOException {
+ List<String> errors = Lists.newArrayList();
+ for (Path location : cleanUpLocations) {
+ try {
+ if (fs.exists(location)) {
+ fs.delete(location, false);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ location.toUri().getPath(), fs.getUri());
+ }
+ } catch (IOException e) {
+ errors.add(location.toUri().getPath());
+ logger.error("Failed to delete location [{}] on file system [{}].",
+ location, fs.getUri(), e);
+ }
+ }
+ if (!errors.isEmpty()) {
+ throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" +
+ " during aborting writer", errors, fs.getUri()));
+ }
}
@Override
@@ -382,4 +419,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
codecFactory.release();
}
+
+ /**
+ * Adds passed location to the list of locations to be cleaned up in case of abort.
+ * Add locations if:
+ * <li>if no locations were added before</li>
+ * <li>if first location is a file</li>
+ *
+ * If first added location is a folder, we don't add other locations (which can be only files),
+ * since this writer was the one to create main folder where files are located,
+ * on abort we'll delete this folder with its content.
+ *
+ * If first location is a file, then we add other files, since this writer didn't create main folder
+ * and on abort we need to delete only created files but not the whole folder.
+ *
+ * @param fs file system where location is created
+ * @param location passed location
+ * @throws IOException in case of errors during check if passed location is a file
+ */
+ private void addCleanUpLocation(FileSystem fs, Path location) throws IOException {
+ if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) {
+ cleanUpLocations.add(location);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 716c56d..522c678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -61,6 +61,7 @@ public class ParquetWriter extends AbstractWriter {
@JsonProperty("child") PhysicalOperator child,
@JsonProperty("location") String location,
@JsonProperty("partitionColumns") List<String> partitionColumns,
+ @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -69,6 +70,7 @@ public class ParquetWriter extends AbstractWriter {
Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.location = location;
this.partitionColumns = partitionColumns;
+ setStorageStrategy(storageStrategy);
}
public ParquetWriter(PhysicalOperator child,
@@ -109,7 +111,9 @@ public class ParquetWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new ParquetWriter(child, location, partitionColumns, formatPlugin);
+ ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin);
+ writer.setStorageStrategy(getStorageStrategy());
+ return writer;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 8a74b49..d65a3eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StringOutputRecordWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -37,6 +37,10 @@ import com.google.common.base.Joiner;
public class DrillTextRecordWriter extends StringOutputRecordWriter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
+ private final StorageStrategy storageStrategy;
+
+ private Path cleanUpLocation;
+
private String location;
private String prefix;
@@ -52,8 +56,9 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
private StringBuilder currentRecord; // contains the current record separated by field delimiter
- public DrillTextRecordWriter(BufferAllocator allocator) {
+ public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
super(allocator);
+ this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
}
@Override
@@ -79,7 +84,17 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
// open a new file for writing data with new schema
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
+ // drill text writer does not support partitions, so only one file can be created
+ // and thus only one location should be deleted in case of abort
+ // to ensure that our writer was the first to create output file,
+ // we create empty output file first and fail if file exists
+ cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+ // since empty output file will be overwritten (some file systems may restrict append option)
+ // we need to re-apply file permission
DataOutputStream fos = fs.create(fileName);
+ storageStrategy.applyToFile(fs, fileName);
+
stream = new PrintStream(fos);
logger.debug("Created file: {}", fileName);
} catch (IOException ex) {
@@ -160,12 +175,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
@Override
public void abort() throws IOException {
- cleanup();
- try {
- fs.delete(new Path(location), true);
- } catch (IOException ex) {
- logger.error("Abort failed. There could be leftover output files");
- throw ex;
+ if (cleanUpLocation != null) {
+ fs.delete(cleanUpLocation, true);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ cleanUpLocation.toUri().getPath(), fs.getUri());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 01e4be0..735ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -203,7 +203,7 @@ drill.exec: {
scan: {
threadpool_size: 8,
decode_threadpool_size: 1
- }
+ },
udf: {
retry-attempts: 5,
directory: {
@@ -227,7 +227,11 @@ drill.exec: {
registry: ${drill.exec.udf.directory.base}"/registry",
tmp: ${drill.exec.udf.directory.base}"/tmp"
}
- }
+ },
+ # Temporary table can be created ONLY in default temporary workspace.
+ # Full workspace name should be indicated (including schema and workspace separated by dot).
+ # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+ default_temporary_workspace: "dfs.tmp"
}
drill.jdbc: {
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 93916e9..fb84088 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -84,6 +84,7 @@ public class BaseTestQuery extends ExecTest {
{
put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
put(ExecConstants.HTTP_ENABLE, "false");
+ put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index e9a38b0..acbf2e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -177,7 +177,7 @@ public class TestDropTable extends PlanTestBase {
@Test // DRILL-4673
public void testDropTableIfExistsWhileTableExists() throws Exception {
- final String existentTableName = "test_table";
+ final String existentTableName = "test_table_exists";
test("use dfs_test.tmp");
// successful dropping of existent table
@@ -192,7 +192,7 @@ public class TestDropTable extends PlanTestBase {
@Test // DRILL-4673
public void testDropTableIfExistsWhileTableDoesNotExist() throws Exception {
- final String nonExistentTableName = "test_table";
+ final String nonExistentTableName = "test_table_not_exists";
test("use dfs_test.tmp");
// dropping of non existent table without error
@@ -200,7 +200,7 @@ public class TestDropTable extends PlanTestBase {
.sqlQuery(String.format(DROP_TABLE_IF_EXISTS, nonExistentTableName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("Table [%s] not found", nonExistentTableName))
+ .baselineValues(false, String.format("Table [%s] not found", nonExistentTableName))
.go();
}
@@ -216,7 +216,7 @@ public class TestDropTable extends PlanTestBase {
.sqlQuery(String.format(DROP_TABLE_IF_EXISTS, viewName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("Table [%s] not found", viewName))
+ .baselineValues(false, String.format("Table [%s] not found", viewName))
.go();
} finally {
test(String.format(DROP_VIEW_IF_EXISTS, viewName));
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
new file mode 100644
index 0000000..f5d45b0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TemporaryTablesAutomaticDropTest extends BaseTestQuery {
+
+ private static final String session_id = "sessionId";
+
+ @Before
+ public void init() throws Exception {
+ new MockUp<UUID>() {
+ @Mock
+ public UUID randomUUID() {
+ return UUID.nameUUIDFromBytes(session_id.getBytes());
+ }
+ };
+ updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+ }
+
+ @Test
+ public void testAutomaticDropWhenClientIsClosed() throws Exception {
+ File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
+ getDfsTestTmpSchemaLocation());
+ updateClient("new_client");
+ assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+ }
+
+ @Test
+ public void testAutomaticDropWhenDrillbitIsClosed() throws Exception {
+ File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
+ getDfsTestTmpSchemaLocation());
+ bits[0].close();
+ assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+ }
+
+ @Test
+ public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception {
+ File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
+ getDfsTestTmpSchemaLocation());
+ StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ String tempDir = TestUtilities.createTempDir();
+ try {
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir);
+ File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
+ updateClient("new_client");
+ assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists());
+ assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists());
+ } finally {
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation());
+ }
+ }
+
+ private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
+ String temporaryTableName = "temporary_table_automatic_drop_" + suffix;
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ File sessionTemporaryLocation = new File(schemaLocation,
+ UUID.nameUUIDFromBytes(session_id.getBytes()).toString());
+ assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists());
+ return sessionTemporaryLocation;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
index 43d8d57..5bf55af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -143,7 +143,7 @@ public class TestBaseViewSupport extends BaseTestQuery {
.sqlQuery(String.format("DROP VIEW IF EXISTS %s", viewFullName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
+ .baselineValues(false, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
.go();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
new file mode 100644
index 0000000..93c8cad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TestCTTAS extends BaseTestQuery {
+
+ private static final UUID session_id = UUID.nameUUIDFromBytes("sessionId".getBytes());
+ private static final String test_schema = "dfs_test";
+ private static final String temp2_wk = "tmp2";
+ private static final String temp2_schema = String.format("%s.%s", test_schema, temp2_wk);
+
+ private static FileSystem fs;
+ private static FsPermission expectedFolderPermission;
+ private static FsPermission expectedFilePermission;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ MockUp<UUID> uuidMockUp = mockRandomUUID(session_id);
+ updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+ uuidMockUp.tearDown();
+
+ StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(test_schema).getConfig();
+ pluginConfig.workspaces.put(temp2_wk, new WorkspaceConfig(TestUtilities.createTempDir(), true, null));
+ pluginRegistry.createOrUpdate(test_schema, pluginConfig, true);
+
+ fs = FileSystem.get(new Configuration());
+ expectedFolderPermission = new FsPermission(StorageStrategy.TEMPORARY.getFolderPermission());
+ expectedFilePermission = new FsPermission(StorageStrategy.TEMPORARY.getFilePermission());
+ }
+
+ private static MockUp<UUID> mockRandomUUID(final UUID uuid) {
+ return new MockUp<UUID>() {
+ @Mock
+ public UUID randomUUID() {
+ return uuid;
+ }
+ };
+ }
+
+ @Test
+ public void testSyntax() throws Exception {
+ test("create TEMPORARY table temporary_keyword as select 1 from (values(1))");
+ test("create TEMPORARY table temporary_keyword_with_wk as select 1 from (values(1))", TEMP_SCHEMA);
+ }
+
+ @Test
+ public void testCreateTableWithDifferentStorageFormats() throws Exception {
+ List<String> storageFormats = Lists.newArrayList("parquet", "json", "csvh");
+
+ try {
+ for (String storageFormat : storageFormats) {
+ String temporaryTableName = "temp_" + storageFormat;
+ mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+ test("alter session set `store.format`='%s'", storageFormat);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ checkPermission(temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("select * from %s", temporaryTableName)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("A")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", temporaryTableName)
+ .unOrdered()
+ .sqlBaselineQuery("select * from %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .go();
+ }
+ } finally {
+ test("alter session reset `store.format`");
+ }
+ }
+
+ @Test
+ public void testTemporaryTablesCaseInsensitivity() throws Exception {
+ String temporaryTableName = "tEmP_InSeNSiTiVe";
+ List<String> temporaryTableNames = Lists.newArrayList(
+ temporaryTableName,
+ temporaryTableName.toLowerCase(),
+ temporaryTableName.toUpperCase());
+
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ for (String tableName : temporaryTableNames) {
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("A")
+ .go();
+ }
+ }
+
+ @Test
+ public void testPartitionByWithTemporaryTables() throws Exception {
+ String temporaryTableName = "temporary_table_with_partitions";
+ mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+ test("create TEMPORARY table %s partition by (c1) as select * from (" +
+ "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName);
+ checkPermission(temporaryTableName);
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception {
+ try {
+ String temporaryTableName = "temporary_table_outside_of_default_workspace";
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Temporary tables are not allowed to be created outside of default temporary workspace [%s].",
+ TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception {
+ String temporaryTableName = "temporary_table_exists_without_schema";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception {
+ String temporaryTableName = "temporary_table_exists_without_schema";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase());
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName.toUpperCase(), TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsWithSchema() throws Exception {
+ String temporaryTableName = "temporary_table_exists_with_schema";
+ try {
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenPersistentTableExists() throws Exception {
+ String persistentTableName = "persistent_table_exists";
+ try {
+ test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, persistentTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", persistentTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", persistentTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenViewExists() throws Exception {
+ String viewName = "view_exists";
+ try {
+ test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, viewName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", viewName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", viewName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreatePersistentTableWhenTemporaryTableExists() throws Exception {
+ String temporaryTableName = "temporary_table_exists_before_persistent";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateViewWhenTemporaryTableExists() throws Exception {
+ String temporaryTableName = "temporary_table_exists_before_view";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A non-view table with given name [%s] already exists in schema [%s]",
+ temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testTemporaryAndPersistentTablesPriority() throws Exception {
+ String name = "temporary_and_persistent_table";
+ test("use %s", temp2_schema);
+ test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+ test("create table %s as select 'persistent_table' as c1 from (values(1))", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("temporary_table")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s.%s", temp2_schema, name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("persistent_table")
+ .go();
+
+ test("drop table %s", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("persistent_table")
+ .go();
+ }
+
+ @Test
+ public void testTemporaryTableAndViewPriority() throws Exception {
+ String name = "temporary_table_and_view";
+ test("use %s", temp2_schema);
+ test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+ test("create view %s as select 'view' as c1 from (values(1))", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("temporary_table")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s.%s", temp2_schema, name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("view")
+ .go();
+
+ test("drop table %s", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("view")
+ .go();
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testTemporaryTablesInViewDefinitions() throws Exception {
+ String temporaryTableName = "temporary_table_for_view_definition";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ try {
+ test("create view %s.view_with_temp_table as select * from %s", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Temporary tables usage is disallowed. Used temporary table name: [%s]", temporaryTableName)));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testManualDropWithoutSchema() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_without_schema";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop table %s", temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+ .go();
+ }
+
+ @Test
+ public void testManualDropWithSchema() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_with_schema";
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop table %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+ .go();
+ }
+
+ @Test
+ public void testDropTemporaryTableAsViewWithoutException() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_like_view_without_exception";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop view if exists %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(false, String.format("View [%s] not found in schema [%s].",
+ temporaryTableName, TEMP_SCHEMA))
+ .go();
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testDropTemporaryTableAsViewWithException() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_like_view_with_exception";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ try {
+ test("drop view %s.%s", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ private void checkPermission(String tmpTableName) throws IOException {
+ File[] files = findTemporaryTableLocation(tmpTableName);
+ assertEquals("Only one directory should match temporary table name " + tmpTableName, 1, files.length);
+ Path tmpTablePath = new Path(files[0].toURI().getPath());
+ assertEquals("Directory permission should match",
+ expectedFolderPermission, fs.getFileStatus(tmpTablePath).getPermission());
+ RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(tmpTablePath, false);
+ while (fileIterator.hasNext()) {
+ assertEquals("File permission should match", expectedFilePermission, fileIterator.next().getPermission());
+ }
+ }
+
+ private File[] findTemporaryTableLocation(String tableName) throws IOException {
+ File sessionTempLocation = new File(getDfsTestTmpSchemaLocation(), session_id.toString());
+ Path sessionTempLocationPath = new Path(sessionTempLocation.toURI().getPath());
+ assertTrue("Session temporary location must exist", fs.exists(sessionTempLocationPath));
+ assertEquals("Session temporary location permission should match",
+ expectedFolderPermission, fs.getFileStatus(sessionTempLocationPath).getPermission());
+ final String tableUUID = UUID.nameUUIDFromBytes(tableName.getBytes()).toString();
+ return sessionTempLocation.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File path) {
+ return path.isDirectory() && path.getName().equals(tableUUID);
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
new file mode 100644
index 0000000..6a377ec
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StorageStrategyTest {
+
+ private static final Configuration configuration = new Configuration();
+ private static final FsPermission full_permission = new FsPermission("777");
+ private static final StorageStrategy persistent_strategy = new StorageStrategy("775", "644", false);
+ private static final StorageStrategy temporary_strategy = new StorageStrategy("700", "600", true);
+ private FileSystem fs;
+
+ @Before
+ public void setup() throws Exception {
+ initFileSystem();
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitFalseForFileWithParent() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 2, true);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = persistent_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, file, true, 2, persistent_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, true);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitTrueForFileWithParent() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 2, true);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = temporary_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, file, true, 2, temporary_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, false);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitFalseForFileOnly() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+
+ Path createdFile = persistent_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", file, createdFile);
+ checkPathAndPermission(initialPath, file, true, 0, persistent_strategy);
+ checkDeleteOnExit(file, true);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitTrueForFileOnly() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+
+ Path createdFile = temporary_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", file, createdFile);
+ checkPathAndPermission(initialPath, file, true, 0, temporary_strategy);
+ checkDeleteOnExit(file, false);
+ }
+
+ @Test(expected = IOException.class)
+ public void testFailureOnExistentFile() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+ fs.createNewFile(file);
+ assertTrue("File should exist", fs.exists(file));
+ try {
+ persistent_strategy.createFileAndApply(fs, file);
+ } catch (IOException e) {
+ assertEquals("Error message should match", String.format("File [%s] already exists on file system [%s].",
+ file.toUri().getPath(), fs.getUri()), e.getMessage());
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCreatePathAndDeleteOnExitFalse() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = persistent_strategy.createPathAndApply(fs, resultPath);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, resultPath, false, 2, persistent_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, true);
+ }
+
+ @Test
+ public void testCreatePathAndDeleteOnExitTrue() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = temporary_strategy.createPathAndApply(fs, resultPath);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, resultPath, false, 2, temporary_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, false);
+ }
+
+ @Test
+ public void testCreateNoPath() throws Exception {
+ Path path = prepareStorageDirectory();
+
+ Path createdParentPath = temporary_strategy.createPathAndApply(fs, path);
+
+ assertNull("Path should be null", createdParentPath);
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(path).getPermission());
+ }
+
+ @Test
+ public void testStrategyForExistingFile() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+ fs.createNewFile(file);
+ fs.setPermission(file, full_permission);
+
+ assertTrue("File should exist", fs.exists(file));
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(file).getPermission());
+
+ temporary_strategy.applyToFile(fs, file);
+
+ assertEquals("Permission should match", new FsPermission(temporary_strategy.getFilePermission()),
+ fs.getFileStatus(file).getPermission());
+ checkDeleteOnExit(file, false);
+ }
+
+ private Path prepareStorageDirectory() throws IOException {
+ File storageDirectory = Files.createTempDir();
+ storageDirectory.deleteOnExit();
+ Path path = new Path(storageDirectory.toURI().getPath());
+ fs.setPermission(path, full_permission);
+ return path;
+ }
+
+ private void initFileSystem() throws IOException {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+ fs = FileSystem.get(configuration);
+ }
+
+ private Path addNLevelsAndFile(Path initialPath, int levels, boolean addFile) {
+ Path resultPath = initialPath;
+ for (int i = 1; i <= levels; i++) {
+ resultPath = new Path(resultPath, "level" + i);
+ }
+ if (addFile) {
+ resultPath = new Path(resultPath, "test_file.txt");
+ }
+ return resultPath;
+ }
+
+ private void checkPathAndPermission(Path initialPath,
+ Path resultPath,
+ boolean isFile,
+ int levels,
+ StorageStrategy storageStrategy) throws IOException {
+
+ assertEquals("Path type should match", isFile, fs.isFile(resultPath));
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(initialPath).getPermission());
+
+ if (isFile) {
+ assertEquals("Permission should match", new FsPermission(storageStrategy.getFilePermission()),
+ fs.getFileStatus(resultPath).getPermission());
+ }
+ Path startingPath = initialPath;
+ FsPermission folderPermission = new FsPermission(storageStrategy.getFolderPermission());
+ for (int i = 1; i <= levels; i++) {
+ startingPath = new Path(startingPath, "level" + i);
+ assertEquals("Permission should match", folderPermission, fs.getFileStatus(startingPath).getPermission());
+ }
+ }
+
+ private void checkDeleteOnExit(Path path, boolean isPresent) throws IOException {
+ assertTrue("Path should be present", fs.exists(path));
+ // close and open file system to check for path presence
+ initFileSystem();
+ assertEquals("Path existence flag should match", isPresent, fs.exists(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 7b977e2..35ca26b 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -9,7 +9,7 @@
writable: false
},
"tmp" : {
- location: "/tmp/drilltest",
+ location: "/tmp",
writable: true
}
},
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index fbacd23..cd97ab7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -176,6 +176,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
/**
+ * Closes all resources connected with current session.
+ * By default has no implementation.
+ */
+ public void closeSession() {
+ }
+
+ /**
* Connection consumer wants to close connection. Initiate connection close
* and complete. This is a blocking call that ensures that the connection is
* closed before returning. As part of this call, the channel close handler
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index c360e51..cdb9c07 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -164,7 +164,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg);
- clientConnection.channelClosed(ex);
+ try {
+ clientConnection.closeSession();
+ } finally {
+ clientConnection.channelClosed(ex);
+ }
}
}
|