Repository: drill
Updated Branches:
refs/heads/master 9f4fff800 -> e9b6e8f3d
DRILL-4593: Remove OldAssignmentCreator in FileSystemPlugin
+ Remove dead code in ParquetGroupScan
this closes #473
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e9b6e8f3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e9b6e8f3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e9b6e8f3
Branch: refs/heads/master
Commit: e9b6e8f3ddadbd308b85ed6d88bcf878147ee77e
Parents: 10afc70
Author: vkorukanti <venki@dremio.com>
Authored: Thu Apr 7 14:23:07 2016 -0700
Committer: vkorukanti <venki@dremio.com>
Committed: Wed Apr 13 10:36:21 2016 -0700
----------------------------------------------------------------------
.../drill/exec/store/kudu/KuduGroupScan.java | 2 +-
.../org/apache/drill/exec/ExecConstants.java | 3 -
.../server/options/SystemOptionManager.java | 1 -
.../exec/store/dfs/easy/EasyGroupScan.java | 2 +-
.../exec/store/parquet/ParquetGroupScan.java | 39 +----
.../exec/store/schedule/AssignmentCreator.java | 13 +-
.../store/schedule/OldAssignmentCreator.java | 141 -------------------
.../drill/exec/store/store/TestAssignment.java | 2 +-
8 files changed, 7 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index ff4295d..873f216 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -190,7 +190,7 @@ public class KuduGroupScan extends AbstractGroupScan {
*/
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList, storagePlugin.getContext());
+ assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 963934d..a490116 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -267,9 +267,6 @@ public interface ExecConstants {
OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
- String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
- OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR,
false);
-
String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute";
BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE,
false);
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a596d3a..0abdb76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,6 @@ public class SystemOptionManager extends BaseOptionManager implements
AutoClosea
ExecConstants.HASH_AGG_TABLE_FACTOR,
ExecConstants.AVERAGE_FIELD_WIDTH,
ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
- ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR,
ExecConstants.ADMIN_USERS_VALIDATOR,
ExecConstants.ADMIN_USER_GROUPS_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index ebea2f4..7a80db3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -195,7 +195,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext());
+ mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
}
private void createMappings(List<EndpointAffinity> affinities) {
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 47172cc..5950b74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -34,7 +34,6 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
@@ -46,7 +45,6 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -59,7 +57,6 @@ import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
-import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
@@ -87,14 +84,12 @@ import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.joda.time.DateTimeUtils;
-import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
@@ -104,12 +99,8 @@ import com.google.common.collect.Sets;
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractFileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
- static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
-
private final List<ReadEntryWithPath> entries;
- private final Stopwatch watch = Stopwatch.createUnstarted();
private final ParquetFormatPlugin formatPlugin;
private final ParquetFormatConfig formatConfig;
private final DrillFileSystem fs;
@@ -716,8 +707,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
if (column.getNulls() != null) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
- } else {
-
}
}
} else {
@@ -790,36 +779,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
- private class BlockMapper extends TimedRunnable<Void> {
- private final BlockMapBuilder bmb;
- private final RowGroupInfo rgi;
-
- public BlockMapper(BlockMapBuilder bmb, RowGroupInfo rgi) {
- super();
- this.bmb = bmb;
- this.rgi = rgi;
- }
-
- @Override
- protected Void runInner() throws Exception {
- EndpointByteMap ebm = bmb.getEndpointByteMap(rgi);
- rgi.setEndpointByteMap(ebm);
- return null;
- }
-
- @Override
- protected IOException convertToIOException(Exception e) {
- return new IOException(String.format(
- "Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(),
- rgi.getStart()));
- }
-
- }
-
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException
{
- this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext());
+ this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
}
@Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 632cf66..eed200e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -27,9 +27,7 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import com.google.common.base.Stopwatch;
@@ -91,14 +89,9 @@ public class AssignmentCreator<T extends CompleteWork> {
* @param units the list of work units to be assigned
* @return A multimap that maps each minor fragment id to a list of work units
*/
- public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint>
incomingEndpoints, List<T> units, DrillbitContext context) {
- boolean useOldAssignmentCode = context == null ? false : context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val;
- if (useOldAssignmentCode) {
- return OldAssignmentCreator.getMappings(incomingEndpoints, units);
- } else {
- AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints,
units);
- return creator.getMappings();
- }
+ public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint>
incomingEndpoints, List<T> units) {
+ AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints,
units);
+ return creator.getMappings();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
deleted file mode 100644
index 48bb5f3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.schedule;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-
-/**
- * The OldAssignmentCreator is responsible for assigning a set of work units to the available
slices.
- */
-public class OldAssignmentCreator<T extends CompleteWork> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
-
- static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
- private final ArrayListMultimap<Integer, T> mappings;
- private final List<DrillbitEndpoint> endpoints;
-
-
-
-
- /**
- * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity
of work units to
- * Drillbits.
- *
- * @param incomingEndpoints
- * The set of nodes to assign work to. Note that nodes can be listed multiple
times if we want to have
- * multiple slices on a node working on the task simultaneously.
- * @param units
- * The work units to assign.
- * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming
order) to with
- */
- public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint>
incomingEndpoints,
- List<T>
units) {
- OldAssignmentCreator<T> creator = new OldAssignmentCreator<T>(incomingEndpoints,
units);
- return creator.mappings;
- }
-
- OldAssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units)
{
- logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
- Stopwatch watch = Stopwatch.createUnstarted();
-
- Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming
endpoints %d "
- + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
- this.mappings = ArrayListMultimap.create();
- this.endpoints = Lists.newLinkedList(incomingEndpoints);
-
- ArrayList<T> rowGroupList = new ArrayList<>(units);
- for (double cutoff : ASSIGNMENT_CUTOFFS) {
- scanAndAssign(rowGroupList, cutoff, false, false);
- }
- scanAndAssign(rowGroupList, 0.0, true, false);
- scanAndAssign(rowGroupList, 0.0, true, true);
-
- logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
- Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned
by now, but some are still unassigned");
- Preconditions.checkState(!units.isEmpty());
-
- }
-
- /**
- *
- * @param mappings
- * the mapping between fragment/endpoint and rowGroup
- * @param endpoints
- * the list of drillbits, ordered by the corresponding fragment
- * @param workunits
- * the list of rowGroups to assign
- * @param requiredPercentage
- * the percentage of max bytes required to make an assignment
- * @param assignAll
- * if true, will assign even if no affinity
- */
- private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean
assignAllToEmpty, boolean assignAll) {
- Collections.sort(workunits);
- int fragmentPointer = 0;
- final boolean requireAffinity = requiredPercentage > 0;
- int maxAssignments = (int) (workunits.size() / endpoints.size());
-
- if (maxAssignments < 1) {
- maxAssignments = 1;
- }
-
- for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
- T unit = iter.next();
- for (int i = 0; i < endpoints.size(); i++) {
- int minorFragmentId = (fragmentPointer + i) % endpoints.size();
- DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
- EndpointByteMap endpointByteMap = unit.getByteMap();
- boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
-
- if (assignAll
- || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
- || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
- && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size()
< maxAssignments) && (!requireAffinity || endpointByteMap
- .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage)))
{
-
- mappings.put(minorFragmentId, unit);
- logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
- // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
- // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
- // if (bytesPerEndpoint.get(currentEndpoint) != null) {
- // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
- // } else {
- // // assignmentAffinityStats.update(0);
- // }
- iter.remove();
- fragmentPointer = (minorFragmentId + 1) % endpoints.size();
- break;
- }
- }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 1efc793..65d8cf7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -63,7 +63,7 @@ public class TestAssignment {
incomingEndpoints.add(incomingEndpointsIterator.next());
}
- ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints,
chunks, null);
+ ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints,
chunks);
System.out.println(mappings.keySet().size());
for (int i = 0; i < width; i++) {
Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size()
> 0);
|