http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
new file mode 100644
index 0000000..6d02046
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.QueryOptimizerRule;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
+
+import com.google.common.collect.Lists;
+
+public class ParquetFormatPlugin implements FormatPlugin{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+ private final DrillbitContext context;
+ static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+ private CodecFactoryExposer codecFactoryExposer;
+ private final DrillFileSystem fs;
+ private final ParquetFormatMatcher formatMatcher;
+ private final ParquetFormatConfig config;
+ private final StoragePluginConfig storageConfig;
+ private final String name;
+
+ public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
+ this(name, context, fs, storageConfig, new ParquetFormatConfig());
+ }
+
+ public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+ this.context = context;
+ this.codecFactoryExposer = new CodecFactoryExposer(fs.getUnderlying().getConf());
+ this.config = formatConfig;
+ this.formatMatcher = new ParquetFormatMatcher(this, fs);
+ this.storageConfig = storageConfig;
+ this.fs = fs;
+ this.name = name == null ? "parquet" : name;
+ }
+
+ Configuration getHadoopConfig() {
+ return fs.getUnderlying().getConf();
+ }
+
+ public DrillFileSystem getFileSystem() {
+ return fs;
+ }
+
+ @Override
+ public ParquetFormatConfig getConfig() {
+ return config;
+ }
+
+ public DrillbitContext getContext() {
+ return this.context;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public List<QueryOptimizerRule> getOptimizerRules() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ParquetGroupScan getGroupScan(FieldReference outputRef, FileSelection selection) throws IOException {
+ return new ParquetGroupScan( selection.getFileStatusList(fs), this, outputRef);
+ }
+
+ @Override
+ public StoragePluginConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ public CodecFactoryExposer getCodecFactoryExposer() {
+ return codecFactoryExposer;
+ }
+
+ public String getName(){
+ return name;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return formatMatcher;
+ }
+
+ private static class ParquetFormatMatcher extends BasicFormatMatcher{
+
+ private final DrillFileSystem fs;
+
+ public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
+ super(plugin, fs, //
+ Lists.newArrayList( //
+ Pattern.compile(".*\\.parquet$"), //
+ Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
+ //
+ ),
+ Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
+
+ );
+ this.fs = fs;
+
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return true;
+ }
+
+ @Override
+ public FormatSelection isReadable(FileSelection file) throws IOException {
+ // TODO: we only check the first file for directory reading. This is because
+ if(file.containsDirectories(fs)){
+ if(isDirReadable(file.getFirstPath(fs))){
+ return new FormatSelection(plugin.getConfig(), file);
+ }
+ }
+ return super.isReadable(file);
+ }
+
+ boolean isDirReadable(FileStatus dir) {
+ Path p = new Path(dir.getPath(), "/" + ParquetFileWriter.PARQUET_METADATA_FILE);
+ try {
+ return fs.getUnderlying().exists(p);
+ } catch (IOException e) {
+ logger.info("Failure while attempting to check for Parquet metadata file.", e);
+ return false;
+ }
+ }
+
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index aa01115..f76e59a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,37 +18,35 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntryFromHDFS;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AffinityCalculator;
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.BlockMapBuilder;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,16 +57,17 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractGroupScan {
@@ -78,11 +77,18 @@ public class ParquetGroupScan extends AbstractGroupScan {
static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
+
final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
- private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings;
+ private ListMultimap<Integer, RowGroupInfo> mappings;
private List<RowGroupInfo> rowGroupInfos;
- private Stopwatch watch = new Stopwatch();
+ private final List<ReadEntryWithPath> entries;
+ private final Stopwatch watch = new Stopwatch();
+ private final ParquetFormatPlugin formatPlugin;
+ private final ParquetFormatConfig formatConfig;
+ private final FileSystem fs;
+ private final FieldReference ref;
+ private List<EndpointAffinity> endpointAffinities;
private List<SchemaPath> columns;
@@ -91,79 +97,87 @@ public class ParquetGroupScan extends AbstractGroupScan {
}
@JsonProperty("storageengine")
- public ParquetStorageEngineConfig getEngineConfig() {
- return this.engineConfig;
+ public ParquetFormatConfig getEngineConfig() {
+ return this.formatConfig;
}
- private List<ReadEntryWithPath> entries;
- private long totalBytes;
- private Collection<DrillbitEndpoint> availableEndpoints;
- private ParquetStorageEngine storageEngine;
- private ParquetStorageEngineConfig engineConfig;
- private FileSystem fs;
- private final FieldReference ref;
- private List<EndpointAffinity> endpointAffinities;
-
@JsonCreator
- public ParquetGroupScan(@JsonProperty("entries") List<ReadEntryWithPath> entries,
- @JsonProperty("storageengine") ParquetStorageEngineConfig storageEngineConfig,
- @JacksonInject StorageEngineRegistry engineRegistry,
- @JsonProperty("ref") FieldReference ref,
- @JsonProperty("columns") List<SchemaPath> columns
- )throws IOException, ExecutionSetupException {
+ public ParquetGroupScan( //
+ @JsonProperty("entries") List<ReadEntryWithPath> entries, //
+ @JsonProperty("storage") StoragePluginConfig storageConfig, //
+ @JsonProperty("format") FormatPluginConfig formatConfig, //
+ @JacksonInject StoragePluginRegistry engineRegistry, //
+ @JsonProperty("ref") FieldReference ref, //
+ @JsonProperty("columns") List<SchemaPath> columns //
+ ) throws IOException, ExecutionSetupException {
engineRegistry.init(DrillConfig.create());
this.columns = columns;
- this.storageEngine = (ParquetStorageEngine) engineRegistry.getEngine(storageEngineConfig);
- this.availableEndpoints = storageEngine.getContext().getBits();
- this.fs = storageEngine.getFileSystem();
- this.engineConfig = storageEngineConfig;
+ if(formatConfig == null) formatConfig = new ParquetFormatConfig();
+ Preconditions.checkNotNull(storageConfig);
+ Preconditions.checkNotNull(formatConfig);
+ this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+ Preconditions.checkNotNull(formatPlugin);
+ this.fs = formatPlugin.getFileSystem().getUnderlying();
+ this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.ref = ref;
- readFooter();
- calculateEndpointBytes();
+ this.readFooterFromEntries();
+
}
- public ParquetGroupScan(List<ReadEntryWithPath> entries, //
- ParquetStorageEngine storageEngine, //
- FieldReference ref, //
- List<SchemaPath> columns) throws IOException {
- this.storageEngine = storageEngine;
- this.columns = columns;
- this.engineConfig = storageEngine.getEngineConfig();
- this.availableEndpoints = storageEngine.getContext().getBits();
- this.fs = storageEngine.getFileSystem();
- this.entries = entries;
+ public ParquetGroupScan(List<FileStatus> files, //
+ ParquetFormatPlugin formatPlugin, //
+ FieldReference ref) //
+ throws IOException {
+ this.formatPlugin = formatPlugin;
+ this.columns = null;
+ this.formatConfig = formatPlugin.getConfig();
+ this.fs = formatPlugin.getFileSystem().getUnderlying();
+
+ this.entries = Lists.newArrayList();
+ for(FileStatus file : files){
+ entries.add(new ReadEntryWithPath(file.getPath().toString()));
+ }
+
this.ref = ref;
- readFooter();
- calculateEndpointBytes();
+ readFooter(files);
}
- private void readFooter() throws IOException {
+ private void readFooterFromEntries() throws IOException {
+ List<FileStatus> files = Lists.newArrayList();
+ for(ReadEntryWithPath e : entries){
+ files.add(fs.getFileStatus(new Path(e.getPath())));
+ }
+ readFooter(files);
+ }
+
+ private void readFooter(List<FileStatus> statuses) throws IOException {
watch.reset();
watch.start();
Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
- rowGroupInfos = new ArrayList();
+
+
+ rowGroupInfos = Lists.newArrayList();
long start = 0, length = 0;
ColumnChunkMetaData columnChunkMetaData;
- for (ReadEntryWithPath readEntryWithPath : entries){
- Path path = new Path(readEntryWithPath.getPath());
- List<Footer> footers = ParquetFileReader.readFooters(this.storageEngine.getHadoopConfig(), path);
+ for (FileStatus status : statuses) {
+ List<Footer> footers = ParquetFileReader.readFooters(formatPlugin.getHadoopConfig(), status);
if (footers.size() == 0) {
- logger.warn("No footers found");
+ throw new IOException(String.format("Unable to find footer for file %s", status.getPath().getName()));
}
-// readEntryWithPath.getPath();
for (Footer footer : footers) {
int index = 0;
ParquetMetadata metadata = footer.getParquetMetadata();
- for (BlockMetaData rowGroup : metadata.getBlocks()){
+ for (BlockMetaData rowGroup : metadata.getBlocks()) {
// need to grab block information from HDFS
columnChunkMetaData = rowGroup.getColumns().iterator().next();
start = columnChunkMetaData.getFirstDataPageOffset();
- // this field is not being populated correctly, but the column chunks know their sizes, just summing them for now
- //end = start + rowGroup.getTotalByteSize();
+ // this field is not being populated correctly, but the column chunks know their sizes, just summing them for
+ // now
+ // end = start + rowGroup.getTotalByteSize();
length = 0;
- for (ColumnChunkMetaData col : rowGroup.getColumns()){
+ for (ColumnChunkMetaData col : rowGroup.getColumns()) {
length += col.getTotalSize();
}
String filePath = footer.getFile().toUri().getPath();
@@ -179,203 +193,109 @@ public class ParquetGroupScan extends AbstractGroupScan {
logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
}
- private void calculateEndpointBytes() {
- Timer.Context tContext = metrics.timer(ENDPOINT_BYTES_TIMER).time();
- watch.reset();
- watch.start();
- AffinityCalculator ac = new AffinityCalculator(fs, availableEndpoints);
- for (RowGroupInfo e : rowGroupInfos) {
- ac.setEndpointBytes(e);
- totalBytes += e.getLength();
- }
- watch.stop();
- tContext.stop();
- logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS));
- }
-
@JsonIgnore
public FileSystem getFileSystem() {
return this.fs;
}
- public static class RowGroupInfo extends ReadEntryFromHDFS {
+ public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
- private HashMap<DrillbitEndpoint,Long> endpointBytes;
- private long maxBytes;
+ private EndpointByteMap byteMap;
private int rowGroupIndex;
@JsonCreator
public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
- @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
super(path, start, length);
this.rowGroupIndex = rowGroupIndex;
}
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1, 2, 1, 1);
+ public RowGroupReadEntry getRowGroupReadEntry() {
+ return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
}
- @Override
- public Size getSize() {
- // TODO - these values are wrong, I cannot know these until after I read a file
- return new Size(10, 10);
- }
-
- public HashMap<DrillbitEndpoint,Long> getEndpointBytes() {
- return endpointBytes;
- }
-
- public void setEndpointBytes(HashMap<DrillbitEndpoint,Long> endpointBytes) {
- this.endpointBytes = endpointBytes;
- }
-
- public void setMaxBytes(long bytes) {
- this.maxBytes = bytes;
+ public int getRowGroupIndex() {
+ return this.rowGroupIndex;
}
- public long getMaxBytes() {
- return maxBytes;
+ @Override
+ public int compareTo(CompleteWork o) {
+ return Long.compare(getTotalBytes(), o.getTotalBytes());
}
- public ParquetRowGroupScan.RowGroupReadEntry getRowGroupReadEntry() {
- return new ParquetRowGroupScan.RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ @Override
+ public long getTotalBytes() {
+ return this.getLength();
}
- public int getRowGroupIndex() {
- return this.rowGroupIndex;
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
}
- }
- private class ParquetReadEntryComparator implements Comparator<RowGroupInfo> {
- public int compare(RowGroupInfo e1, RowGroupInfo e2) {
- if (e1.getMaxBytes() == e2.getMaxBytes()) return 0;
- return (e1.getMaxBytes() > e2.getMaxBytes()) ? 1 : -1;
+ public void setEndpointByteMap(EndpointByteMap byteMap) {
+ this.byteMap = byteMap;
}
}
/**
- *Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
+ * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
* rowGroup
+ *
* @return a list of EndpointAffinity objects
*/
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- watch.reset();
- watch.start();
+
if (this.endpointAffinities == null) {
- HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
- for (RowGroupInfo entry : rowGroupInfos) {
- for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
- long bytes = entry.getEndpointBytes().get(d);
- float affinity = (float)bytes / (float)totalBytes;
- logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
- if (affinities.keySet().contains(d)) {
- affinities.put(d, affinities.get(d) + affinity);
- } else {
- affinities.put(d, affinity);
- }
+ BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits());
+ try{
+ for (RowGroupInfo rgi : rowGroupInfos) {
+ EndpointByteMap ebm = bmb.getEndpointByteMap(rgi);
+ rgi.setEndpointByteMap(ebm);
}
+ } catch (IOException e) {
+ logger.warn("Failure while determining operator affinity.", e);
+ return Collections.emptyList();
}
- List<EndpointAffinity> affinityList = new LinkedList<>();
- for (DrillbitEndpoint d : affinities.keySet()) {
- logger.debug("Endpoint {} has affinity {}", d.getAddress(), affinities.get(d).floatValue());
- affinityList.add(new EndpointAffinity(d,affinities.get(d).floatValue()));
- }
- this.endpointAffinities = affinityList;
+
+ this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
}
- watch.stop();
- logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
return this.endpointAffinities;
}
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
- static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.00};
+ this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
- /**
- *
- * @param incomingEndpoints
- */
- @Override
- public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- watch.reset();
- watch.start();
- final Timer.Context tcontext = metrics.timer(ASSIGNMENT_TIMER).time();
- Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size(), String.format("Incoming endpoints %d " +
- "is greater than number of row groups %d", incomingEndpoints.size(), rowGroupInfos.size()));
- mappings = ArrayListMultimap.create();
- ArrayList rowGroupList = new ArrayList(rowGroupInfos);
- List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints);
- for(double cutoff : ASSIGNMENT_CUTOFFS ){
- scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
- }
- scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
- tcontext.stop();
- watch.stop();
- logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
- Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
- Preconditions.checkState(!rowGroupInfos.isEmpty());
}
- public int fragmentPointer = 0;
+ @Override
+ public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < mappings.size() : String.format(
+ "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
+ minorFragmentId);
- /**
- *
- * @param endpointAssignments the mapping between fragment/endpoint and rowGroup
- * @param endpoints the list of drillbits, ordered by the corresponding fragment
- * @param rowGroups the list of rowGroups to assign
- * @param requiredPercentage the percentage of max bytes required to make an assignment
- * @param assignAll if true, will assign even if no affinity
- */
- private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints,
- List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) {
- Collections.sort(rowGroups, new ParquetReadEntryComparator());
- final boolean requireAffinity = requiredPercentage > 0;
- int maxAssignments = (int) (rowGroups.size() / endpoints.size());
-
- if (maxAssignments < 1) maxAssignments = 1;
-
- for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){
- RowGroupInfo rowGroupInfo = iter.next();
- for (int i = 0; i < endpoints.size(); i++) {
- int minorFragmentId = (fragmentPointer + i) % endpoints.size();
- DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
- Map<DrillbitEndpoint, Long> bytesPerEndpoint = rowGroupInfo.getEndpointBytes();
- boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ;
-
- if (assignAll ||
- (!bytesPerEndpoint.isEmpty() &&
- (!requireAffinity || haveAffinity) &&
- (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
- (!requireAffinity || bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage))) {
-
- endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry());
- logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress());
- if (bytesPerEndpoint.get(currentEndpoint) != null) {
- assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
- } else {
- assignmentAffinityStats.update(0);
- }
- iter.remove();
- fragmentPointer = (minorFragmentId + 1) % endpoints.size();
- break;
- }
- }
+ List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
- }
+ Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
+ String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
+
+ return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), ref, columns);
}
- @Override
- public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < mappings.size() : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), minorFragmentId);
- for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings.get(minorFragmentId)) {
- logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
+
+
+ private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups){
+ List<RowGroupReadEntry> entries = Lists.newArrayList();
+ for (RowGroupInfo rgi : rowGroups) {
+ RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(),
+ rgi.getRowGroupIndex());
+ entries.add(rgre);
}
- Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId), ref,
- columns);
+ return entries;
}
-
public FieldReference getRef() {
return ref;
@@ -392,21 +312,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
@Override
public OperatorCost getCost() {
- //TODO Figure out how to properly calculate cost
- return new OperatorCost(1,rowGroupInfos.size(),1,1);
+ // TODO Figure out how to properly calculate cost
+ return new OperatorCost(1, rowGroupInfos.size(), 1, 1);
}
@Override
public Size getSize() {
// TODO - this is wrong, need to populate correctly
- return new Size(10,10);
+ return new Size(10, 10);
}
@Override
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- //TODO return copy of self
+ // TODO return copy of self
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 1e6c31a..9e1cc66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -49,7 +49,7 @@ import parquet.schema.PrimitiveType;
import com.google.common.base.Joiner;
-public class ParquetRecordReader implements RecordReader {
+class ParquetRecordReader implements RecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
// this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index b3ce9b4..0e672d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -25,15 +25,16 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntryFromHDFS;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -48,43 +49,53 @@ import com.google.common.collect.Iterators;
public class ParquetRowGroupScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
- public final StorageEngineConfig engineConfig;
- private final ParquetStorageEngine parquetStorageEngine;
+ public final ParquetFormatConfig formatConfig;
+ private final ParquetFormatPlugin formatPlugin;
private final List<RowGroupReadEntry> rowGroupReadEntries;
private final FieldReference ref;
private final List<SchemaPath> columns;
@JsonCreator
- public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry,
- @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
- @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
- @JsonProperty("ref") FieldReference ref,
- @JsonProperty("columns") List<SchemaPath> columns
- ) throws ExecutionSetupException {
- parquetStorageEngine = (ParquetStorageEngine) registry.getEngine(engineConfig);
+ public ParquetRowGroupScan( //
+ @JacksonInject StoragePluginRegistry registry, //
+ @JsonProperty("storage") StoragePluginConfig storageConfig, //
+ @JsonProperty("format") FormatPluginConfig formatConfig, //
+ @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
+ @JsonProperty("ref") FieldReference ref, //
+ @JsonProperty("columns") List<SchemaPath> columns //
+ ) throws ExecutionSetupException {
+
+ if(formatConfig == null) formatConfig = new ParquetFormatConfig();
+ Preconditions.checkNotNull(storageConfig);
+ Preconditions.checkNotNull(formatConfig);
+ this.formatPlugin = (ParquetFormatPlugin) registry.getFormatPlugin(storageConfig, formatConfig);
+ Preconditions.checkNotNull(formatPlugin);
this.rowGroupReadEntries = rowGroupReadEntries;
- this.engineConfig = engineConfig;
+ this.formatConfig = formatPlugin.getConfig();
this.ref = ref;
this.columns = columns;
}
- public ParquetRowGroupScan(ParquetStorageEngine engine, ParquetStorageEngineConfig config,
- List<RowGroupReadEntry> rowGroupReadEntries, FieldReference ref,
- List<SchemaPath> columns
- ) {
- parquetStorageEngine = engine;
- engineConfig = config;
+ public ParquetRowGroupScan( //
+ ParquetFormatPlugin formatPlugin, //
+ List<RowGroupReadEntry> rowGroupReadEntries, //
+ FieldReference ref, //
+ List<SchemaPath> columns) {
+ this.formatPlugin = formatPlugin;
+ this.formatConfig = formatPlugin.getConfig();
this.rowGroupReadEntries = rowGroupReadEntries;
this.ref = ref;
this.columns = columns;
}
+ @JsonProperty("entries")
public List<RowGroupReadEntry> getRowGroupReadEntries() {
return rowGroupReadEntries;
}
- public StorageEngineConfig getEngineConfig() {
- return engineConfig;
+ @JsonProperty("storage")
+ public StoragePluginConfig getEngineConfig() {
+ return formatPlugin.getStorageConfig();
}
@Override
@@ -92,7 +103,6 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
return null;
}
-
public FieldReference getRef() {
return ref;
}
@@ -108,8 +118,8 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
}
@JsonIgnore
- public ParquetStorageEngine getStorageEngine(){
- return parquetStorageEngine;
+ public ParquetFormatPlugin getStorageEngine() {
+ return formatPlugin;
}
@Override
@@ -120,8 +130,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries,
- ref, columns);
+ return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, ref, columns);
}
@Override
@@ -133,36 +142,4 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
return columns;
}
- public static class RowGroupReadEntry extends ReadEntryFromHDFS {
-
- private int rowGroupIndex;
-
- @parquet.org.codehaus.jackson.annotate.JsonCreator
- public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
- @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
- super(path, start, length);
- this.rowGroupIndex = rowGroupIndex;
- }
-
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1, 2, 1, 1);
- }
-
- @Override
- public Size getSize() {
- // TODO - these values are wrong, I cannot know these until after I read a file
- return new Size(10, 10);
- }
-
- @JsonIgnore
- public RowGroupReadEntry getRowGroupReadEntry() {
- return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
- }
-
- public int getRowGroupIndex(){
- return rowGroupIndex;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 966a16b..17e7da2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -26,7 +26,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -38,8 +41,10 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -50,9 +55,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
+
+ FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
+
// keep footers in a map to avoid re-reading them
Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
- for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
+ for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
/*
Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
@@ -63,11 +71,11 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
try {
if ( ! footers.containsKey(e.getPath())){
footers.put(e.getPath(),
- ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath())));
+ ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
}
readers.add(
new ParquetRecordReader(
- context, e.getPath(), e.getRowGroupIndex(), rowGroupScan.getStorageEngine().getFileSystem(),
+ context, e.getPath(), e.getRowGroupIndex(), fs,
rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
footers.get(e.getPath()),
rowGroupScan.getRef(),
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
deleted file mode 100644
index c17a9e3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.store.ClassPathFileSystem;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-import com.beust.jcommander.internal.Lists;
-
-public class ParquetSchemaProvider implements SchemaProvider{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetSchemaProvider.class);
-
- public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
- final ParquetStorageEngineConfig configuration;
- final FileSystem fs;
- final Configuration conf;
-
- public ParquetSchemaProvider(ParquetStorageEngineConfig configuration, DrillConfig config){
- this.configuration = configuration;
- try {
- this.conf = new Configuration();
- this.conf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
- this.conf.set(HADOOP_DEFAULT_NAME, configuration.getDfsName());
- logger.debug("{}: {}",HADOOP_DEFAULT_NAME, configuration.getDfsName());
- this.fs = FileSystem.get(conf);
- } catch (IOException ie) {
- throw new RuntimeException("Error setting up filesystem", ie);
- }
- }
-
- @Override
- public Object getSelectionBaseOnName(String tableName) {
- try{
- if(!fs.exists(new Path(tableName))) return null;
- ReadEntryWithPath re = new ReadEntryWithPath(tableName);
- return Lists.newArrayList(re);
- }catch(Exception e){
- logger.warn(String.format("Failure while checking table name %s.", tableName), e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
deleted file mode 100644
index ad9756e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
-
-import com.google.common.collect.ListMultimap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-
-public class ParquetStorageEngine extends AbstractStorageEngine{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
- private final DrillbitContext context;
- static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private CodecFactoryExposer codecFactoryExposer;
- private final ParquetSchemaProvider schemaProvider;
- private final ParquetStorageEngineConfig engineConfig;
-
- public ParquetStorageEngine(ParquetStorageEngineConfig configuration, DrillbitContext context){
- this.context = context;
- this.schemaProvider = new ParquetSchemaProvider(configuration, context.getConfig());
- codecFactoryExposer = new CodecFactoryExposer(schemaProvider.conf);
- this.engineConfig = configuration;
- }
-
- public Configuration getHadoopConfig() {
- double y = 5;
- int x = (int) y;
- return schemaProvider.conf;
- }
-
- public FileSystem getFileSystem() {
- return schemaProvider.fs;
- }
-
- public ParquetStorageEngineConfig getEngineConfig() {
- return engineConfig;
- }
-
- public DrillbitContext getContext() {
- return this.context;
- }
-
- @Override
- public boolean supportsRead() {
- return true;
- }
-
- @Override
- public ParquetGroupScan getPhysicalScan(Scan scan) throws IOException {
-
- ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
- new TypeReference<ArrayList<ReadEntryWithPath>>() {});
-
- return new ParquetGroupScan( readEntries, this, scan.getOutputReference(), null);
- }
-
- @Override
- public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
- return null;
- }
-
- @Override
- public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
- return null;
- }
-
-
- public CodecFactoryExposer getCodecFactoryExposer() {
- return codecFactoryExposer;
- }
-
- @Override
- public ParquetSchemaProvider getSchemaProvider() {
- return schemaProvider;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
deleted file mode 100644
index f2d6124..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.util.HashMap;
-
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.exec.store.DistributedStorageEngine;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("parquet")
-public class ParquetStorageEngineConfig extends StorageEngineConfigBase implements DistributedStorageEngine{
-
- public String getDfsName() {
- return dfsName;
- }
-
- // information needed to identify an HDFS instance
- private String dfsName;
- private HashMap<String,String> map;
-
- @JsonCreator
- public ParquetStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
- this.dfsName = dfsName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ParquetStorageEngineConfig that = (ParquetStorageEngineConfig) o;
-
- if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return dfsName != null ? dfsName.hashCode() : 0;
- }
- public void set(String key, String value) {
- map.put(key, value);
- }
-
- public String get(String key) {
- return map.get(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
new file mode 100644
index 0000000..986328e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RowGroupReadEntry extends ReadEntryFromHDFS {
+
+ private int rowGroupIndex;
+
+ @parquet.org.codehaus.jackson.annotate.JsonCreator
+ public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+ super(path, start, length);
+ this.rowGroupIndex = rowGroupIndex;
+ }
+
+ @JsonIgnore
+ public RowGroupReadEntry getRowGroupReadEntry() {
+ return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ }
+
+ public int getRowGroupIndex(){
+ return rowGroupIndex;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 0321838..d9e498e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -103,6 +103,10 @@ public class VarLenBinaryReader {
do {
lengthVarFieldsInCurrentRecord = 0;
for (ColumnReader columnReader : columns) {
+ if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+ rowGroupFinished = true;
+ break;
+ }
if (columnReader.pageReadStatus.currentPage == null
|| columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
new file mode 100644
index 0000000..d25abad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.beust.jcommander.internal.Lists;
+import com.carrotsearch.hppc.ObjectFloatOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectFloatCursor;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import com.google.common.base.Stopwatch;
+
+public class AffinityCreator {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCreator.class);
+
+ public static <T extends CompleteWork> List<EndpointAffinity> getAffinityMap(List<T> work){
+ Stopwatch watch = new Stopwatch();
+
+ long totalBytes = 0;
+ for (CompleteWork entry : work) {
+ totalBytes += entry.getTotalBytes();
+ }
+
+ ObjectFloatOpenHashMap<DrillbitEndpoint> affinities = new ObjectFloatOpenHashMap<DrillbitEndpoint>();
+ for (CompleteWork entry : work) {
+ for (ObjectLongCursor<DrillbitEndpoint> cursor : entry.getByteMap()) {
+ long bytes = cursor.value;
+ float affinity = (float)bytes / (float)totalBytes;
+ logger.debug("Work: {} Endpoint: {} Bytes: {}", work, cursor.key.getAddress(), bytes);
+ affinities.putOrAdd(cursor.key, affinity, affinity);
+ }
+ }
+
+ List<EndpointAffinity> affinityList = Lists.newLinkedList();
+ for (ObjectFloatCursor<DrillbitEndpoint> d : affinities) {
+ logger.debug("Endpoint {} has affinity {}", d.key.getAddress(), d.value);
+ affinityList.add(new EndpointAffinity(d.key, d.value));
+ }
+
+ logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
+ return affinityList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
new file mode 100644
index 0000000..eaa4f17
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+/**
+ * The AssignmentCreator is responsible for assigning a set of work units to the available slices.
+ */
+public class AssignmentCreator<T extends CompleteWork> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
+
+ static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
+ private final ArrayListMultimap<Integer, T> mappings;
+ private final List<DrillbitEndpoint> endpoints;
+
+
+
+ /**
+ * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
+ * Drillbits.
+ *
+ * @param incomingEndpoints
+ * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
+ * multiple slices on a node working on the task simultaneously.
+ * @param units
+ * The work units to assign.
+ * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
+ */
+ public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
+ List<T> units) {
+ AssignmentCreator<T> creator = new AssignmentCreator<T>(incomingEndpoints, units);
+ return creator.mappings;
+ }
+
+ private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ Stopwatch watch = new Stopwatch();
+
+ Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
+ + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
+ this.mappings = ArrayListMultimap.create();
+ this.endpoints = Lists.newLinkedList(incomingEndpoints);
+
+ ArrayList<T> rowGroupList = new ArrayList<>(units);
+ for (double cutoff : ASSIGNMENT_CUTOFFS) {
+ scanAndAssign(rowGroupList, cutoff, false);
+ }
+ scanAndAssign(rowGroupList, 0.0, true);
+
+ logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+ Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+ Preconditions.checkState(!units.isEmpty());
+
+ }
+
+ /**
+ *
+ * @param mappings
+ * the mapping between fragment/endpoint and rowGroup
+ * @param endpoints
+ * the list of drillbits, ordered by the corresponding fragment
+ * @param workunits
+ * the list of rowGroups to assign
+ * @param requiredPercentage
+ * the percentage of max bytes required to make an assignment
+ * @param assignAll
+ * if true, will assign even if no affinity
+ */
+ private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAll) {
+ Collections.sort(workunits);
+ int fragmentPointer = 0;
+ final boolean requireAffinity = requiredPercentage > 0;
+ int maxAssignments = (int) (workunits.size() / endpoints.size());
+
+ if (maxAssignments < 1)
+ maxAssignments = 1;
+
+ for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
+ T unit = iter.next();
+ for (int i = 0; i < endpoints.size(); i++) {
+ int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+ DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+ EndpointByteMap endpointByteMap = unit.getByteMap();
+ boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
+
+ if (assignAll
+ || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
+ && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
+ .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
+
+ mappings.put(minorFragmentId, unit);
+ // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
+ // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+ // if (bytesPerEndpoint.get(currentEndpoint) != null) {
+ // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
+ // } else {
+ // // assignmentAffinityStats.update(0);
+ // }
+ iter.remove();
+ fragmentPointer = (minorFragmentId + 1) % endpoints.size();
+ break;
+ }
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
new file mode 100644
index 0000000..432c1d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.internal.Lists;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableRangeMap;
+import com.google.common.collect.Range;
+
+public class BlockMapBuilder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
+ static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final String BLOCK_MAP_BUILDER_TIMER = MetricRegistry.name(BlockMapBuilder.class, "blockMapBuilderTimer");
+
+ private HashMap<Path,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = new HashMap<>();
+ private Collection<DrillbitEndpoint> endpoints;
+ private FileSystem fs;
+ private HashMap<String,DrillbitEndpoint> endPointMap;
+
+ public BlockMapBuilder(FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
+ this.fs = fs;
+ this.endpoints = endpoints;
+ buildEndpointMap();
+ }
+
+
+ public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{
+ List<CompleteFileWork> work = Lists.newArrayList();
+ for(FileStatus f : files){
+ ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f);
+ if(!blockify){
+ work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
+ continue;
+ }
+
+ for(Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()){
+ work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString()));
+ }
+ }
+ return work;
+ }
+
+ private class FileStatusWork implements FileWork{
+ private FileStatus status;
+
+ public FileStatusWork(FileStatus status) {
+ if(status.isDir()) throw new IllegalStateException("FileStatus work only works with files, not directories.");
+ this.status = status;
+ }
+
+ @Override
+ public String getPath() {
+ return status.getPath().toString();
+ }
+
+ @Override
+ public long getStart() {
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ return status.getLen();
+ }
+
+
+
+ }
+
+ private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(Path path) throws IOException {
+ FileStatus status = fs.getFileStatus(path);
+ return buildBlockMap(status);
+ }
+
+ /**
+ * Builds a mapping of block locations to file byte range
+ */
+ private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(FileStatus status) throws IOException {
+ final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time();
+ BlockLocation[] blocks;
+ ImmutableRangeMap<Long,BlockLocation> blockMap;
+ blocks = fs.getFileBlockLocations(status, 0 , status.getLen());
+ ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
+ for (BlockLocation block : blocks) {
+ long start = block.getOffset();
+ long end = start + block.getLength();
+ Range<Long> range = Range.closedOpen(start, end);
+ blockMapBuilder = blockMapBuilder.put(range, block);
+ }
+ blockMap = blockMapBuilder.build();
+ blockMapMap.put(status.getPath(), blockMap);
+ context.stop();
+ return blockMap;
+ }
+
+ private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{
+ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(path);
+ if(blockMap == null){
+ blockMap = buildBlockMap(path);
+ }
+ return blockMap;
+ }
+
+ private ImmutableRangeMap<Long,BlockLocation> getBlockMap(FileStatus status) throws IOException{
+ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(status.getPath());
+ if(blockMap == null){
+ blockMap = buildBlockMap(status);
+ }
+ return blockMap;
+ }
+
+
+ /**
+ * For a given FileWork, calculate how many bytes are available on each on drillbit endpoint
+ *
+ * @param work the FileWork to calculate endpoint bytes for
+ * @throws IOException
+ */
+ public EndpointByteMap getEndpointByteMap(FileWork work) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ Path fileName = new Path(work.getPath());
+
+
+ ImmutableRangeMap<Long,BlockLocation> blockMap = getBlockMap(fileName);
+ EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl();
+ long start = work.getStart();
+ long end = start + work.getLength();
+ Range<Long> rowGroupRange = Range.closedOpen(start, end);
+
+ // Find submap of ranges that intersect with the rowGroup
+ ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+
+ // Iterate through each block in this submap and get the host for the block location
+ for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+ String[] hosts;
+ Range<Long> blockRange = block.getKey();
+ try {
+ hosts = block.getValue().getHosts();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to get hosts for block location", ioe);
+ }
+ Range<Long> intersection = rowGroupRange.intersection(blockRange);
+ long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+
+ // For each host in the current block location, add the intersecting bytes to the corresponding endpoint
+ for (String host : hosts) {
+ DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
+ if(endpoint != null){
+ endpointByteMap.add(endpoint, bytes);
+ }else{
+ logger.debug("Failure finding Drillbit running on host {}. Skipping affinity to that host.", host);
+ }
+ }
+ }
+
+ logger.debug("FileWork group ({},{}) max bytes {}", work.getPath(), work.getStart(), endpointByteMap.getMaxBytes());
+
+ logger.debug("Took {} ms to set endpoint bytes", watch.stop().elapsed(TimeUnit.MILLISECONDS));
+ return endpointByteMap;
+ }
+
+ private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
+ return endPointMap.get(hostName);
+ }
+
+ /**
+ * Builds a mapping of Drillbit endpoints to hostnames
+ */
+ private void buildEndpointMap() {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ endPointMap = new HashMap<String, DrillbitEndpoint>();
+ for (DrillbitEndpoint d : endpoints) {
+ String hostName = d.getAddress();
+ endPointMap.put(hostName, d);
+ }
+ watch.stop();
+ logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
new file mode 100644
index 0000000..30b08f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CompleteFileWork implements FileWork, CompleteWork{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
+
+ private long start;
+ private long length;
+ private String path;
+ private EndpointByteMap byteMap;
+
+ public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) {
+ super();
+ this.start = start;
+ this.length = length;
+ this.path = path;
+ this.byteMap = byteMap;
+ }
+
+ @Override
+ public int compareTo(CompleteWork o) {
+ return Long.compare(getTotalBytes(), o.getTotalBytes());
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return length;
+ }
+
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
+ }
+
+ @Override
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public long getStart() {
+ return start;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ public FileWorkImpl getAsFileWork(){
+ return new FileWorkImpl(start, length, path);
+ }
+
+ public static class FileWorkImpl implements FileWork{
+
+ @JsonCreator
+ public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) {
+ super();
+ this.start = start;
+ this.length = length;
+ this.path = path;
+ }
+
+ public long start;
+ public long length;
+ public String path;
+
+ @Override
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public long getStart() {
+ return start;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
new file mode 100644
index 0000000..44e27d4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+
+/**
+ * Container that holds a complete work unit. Can contain one or more partial units.
+ */
+public interface CompleteWork extends Comparable<CompleteWork>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteWork.class);
+
+ public long getTotalBytes();
+ public EndpointByteMap getByteMap();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
new file mode 100644
index 0000000..f543d75
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+
+/**
+ * Presents an interface that describes the number of bytes for a particular work unit associated with a particular DrillbitEndpoint.
+ */
+public interface EndpointByteMap extends Iterable<ObjectLongCursor<DrillbitEndpoint>>{
+
+ public boolean isSet(DrillbitEndpoint endpoint);
+ public long get(DrillbitEndpoint endpoint);
+ public boolean isEmpty();
+ public long getMaxBytes();
+ public void add(DrillbitEndpoint endpoint, long bytes);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
new file mode 100644
index 0000000..64e52eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.carrotsearch.hppc.ObjectLongOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+
+public class EndpointByteMapImpl implements EndpointByteMap{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointByteMapImpl.class);
+
+ private final ObjectLongOpenHashMap<DrillbitEndpoint> map = new ObjectLongOpenHashMap<>();
+
+ private long maxBytes;
+
+ public boolean isSet(DrillbitEndpoint endpoint){
+ return map.containsKey(endpoint);
+ }
+
+ public long get(DrillbitEndpoint endpoint){
+ return map.get(endpoint);
+ }
+
+ public boolean isEmpty(){
+ return map.isEmpty();
+ }
+
+ public void add(DrillbitEndpoint endpoint, long bytes){
+ assert endpoint != null;
+ maxBytes = Math.max(maxBytes, map.putOrAdd(endpoint, bytes, bytes)+1);
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ @Override
+ public Iterator<ObjectLongCursor<DrillbitEndpoint>> iterator() {
+ return map.iterator();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
new file mode 100644
index 0000000..bb8d950
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class PartialWork {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartialWork.class);
+
+ private final long length;
+ private final DrillbitEndpoint[] locations;
+
+ public PartialWork(long length, DrillbitEndpoint[] locations) {
+ super();
+ this.length = length;
+ this.locations = locations;
+ }
+
+ public long getLength() {
+ return length;
+ }
+ public DrillbitEndpoint[] getLocations() {
+ return locations;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 329815d..baecc3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -346,7 +346,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
private void runSQL(String sql) {
try{
- DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry());
+ DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getFactory(), context.getFunctionRegistry());
LogicalPlan plan = sqlWorker.getPlan(sql);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/storage-engines.json b/exec/java-exec/src/main/resources/storage-engines.json
deleted file mode 100644
index d1d0413..0000000
--- a/exec/java-exec/src/main/resources/storage-engines.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
- "storage":{
- "parquet-local" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- },
- "parquet-cp" :
- {
- "type":"parquet",
- "dfsName" : "classpath:///"
- },
- "jsonl" :
- {
- "type":"json",
- "dfsName" : "file:///"
- },
- "json-cp" :
- {
- "type":"json",
- "dfsName" : "classpath:///"
- },
- "parquet" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- }
- }
-}
\ No newline at end of file
|