http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 047d3d9..15b2f53 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -1,21 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>drill-root</artifactId>
@@ -23,37 +18,39 @@
<version>1.0.0-m2-incubating-SNAPSHOT</version>
</parent>
- <groupId>org.apache.drill</groupId>
<artifactId>distribution</artifactId>
<packaging>pom</packaging>
<name>Packaging and Distribution Assembly</name>
-
+
<dependencies>
<dependency>
+ <groupId>sqlline</groupId>
+ <artifactId>sqlline</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.drill</groupId>
<artifactId>drill-protocol</artifactId>
<version>${project.version}</version>
+ <classifier>rebuffed</classifier>
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
- <artifactId>drill-java-exec</artifactId>
+ <artifactId>drill-buffers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
- <artifactId>drill-netty-bufferl</artifactId>
- <version>4.0.7.Final</version>
- </dependency>
- <dependency>
- <groupId>org.apache.drill.exec</groupId>
- <artifactId>drill-ref</artifactId>
+ <artifactId>drill-java-exec</artifactId>
<version>${project.version}</version>
+ <classifier>rebuffed</classifier>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
<version>${project.version}</version>
+ <classifier>rebuffed</classifier>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
@@ -88,9 +85,10 @@
</excludes>
</configuration>
</plugin>
-
+
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
<executions>
<execution>
<id>distro-assembly</id>
@@ -148,7 +146,7 @@
</build>
</profile>
<!-- END SNIPPET: release-profile -->
-
+
<profile>
<id>rpm</id>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index c50df09..60aea67 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -22,28 +22,28 @@
<id>binary-release</id>
<formats>
<format>tar.gz</format>
- <format>dir</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<moduleSets>
- <moduleSet>
- <!-- Enable access to all projects in the current multimodule build! -->
- <useAllReactorProjects>true</useAllReactorProjects>
+ </moduleSets>
+ <dependencySets>
+ <dependencySet>
- <!-- Now, select which projects to include in this module-set. -->
+<!-- Now, select which projects to include in this module-set. -->
<includes>
<include>org.apache.drill:drill-sqlparser:jar</include>
- <include>org.apache.drill.exec:drill-netty-bufferl</include>
- <include>org.apache.drill.exec:drill-ref</include>
+ <include>org.apache.drill.exec:drill-buffers</include>
+ <include>org.apache.drill:drill-protocol:jar:rebuffed</include>
+ <include>org.apache.drill:drill-common:jar:rebuffed</include>
+ <include>org.apache.drill.exec:drill-java-exec:jar:rebuffed</include>
</includes>
- <binaries>
- <outputDirectory>jars</outputDirectory>
- <unpack>false</unpack>
- </binaries>
- </moduleSet>
- </moduleSets>
- <dependencySets>
+
+
+
+ <outputDirectory>jars</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ </dependencySet>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
@@ -51,7 +51,8 @@
<excludes>
<exclude>org.apache.drill</exclude>
<exclude>org.apache.drill.exec</exclude>
- <exclude>com.google.protobuf:protobuf-java:jar:2.5.0</exclude>
+ <exclude>org.hsqldb:hsqldb</exclude> <!-- exclude or sqlline has problems -->
+ <exclude>hsqldb:hsqldb</exclude> <!-- exclude or sqlline has problems -->
</excludes>
<scope>test</scope>
</dependencySet>
@@ -75,22 +76,6 @@
<files>
<file>
- <source>../protocol/target/drill-protocol-${project.version}-rebuffed.jar</source>
- <outputDirectory>jars</outputDirectory>
- </file>
- <file>
- <source>../common/target/drill-common-${project.version}-rebuffed.jar</source>
- <outputDirectory>jars</outputDirectory>
- </file>
- <file>
- <source>../exec/java-exec/target/drill-java-exec-${project.version}-rebuffed.jar</source>
- <outputDirectory>jars</outputDirectory>
- </file>
- <file>
- <source>../contrib/storage-hive/target/storage-hive-${project.version}.jar</source>
- <outputDirectory>jars</outputDirectory>
- </file>
- <file>
<source>src/resources/runbit</source>
<outputDirectory>bin</outputDirectory>
</file>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/distribution/src/resources/sqlline
----------------------------------------------------------------------
diff --git a/distribution/src/resources/sqlline b/distribution/src/resources/sqlline
index c815e8b..c526f03 100755
--- a/distribution/src/resources/sqlline
+++ b/distribution/src/resources/sqlline
@@ -72,7 +72,7 @@ DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/sqlline.
if [ -n "$QUERY" ]
then
- echo $QUERY | exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP sqlline.SqlLine "${ARGS[@]}"
+ echo $QUERY | exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP sqlline.SqlLine -d org.apache.drill.jdbc.Driver "${ARGS[@]}"
elif [ -n "$FILE" ]
then
exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP sqlline.SqlLine "${ARGS[@]}" --run=$FILE
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/bufferl/pom.xml
----------------------------------------------------------------------
diff --git a/exec/bufferl/pom.xml b/exec/bufferl/pom.xml
index 834a98f..a8e48ad 100644
--- a/exec/bufferl/pom.xml
+++ b/exec/bufferl/pom.xml
@@ -18,9 +18,7 @@
<version>1.0.0-m2-incubating-SNAPSHOT</version>
</parent>
- <groupId>org.apache.drill.exec</groupId>
- <version>4.0.7.Final</version>
- <artifactId>drill-netty-bufferl</artifactId>
+ <artifactId>drill-buffers</artifactId>
<name>exec/Netty Little Endian Buffers</name>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 9ea8304..a016afc 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -44,12 +44,10 @@
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
- <version>2.9.1</version>
</dependency>
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
- <version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.sun.codemodel</groupId>
@@ -58,12 +56,12 @@
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
- <artifactId>commons-compiler-jdk</artifactId>
- <version>2.6.1</version>
+ <artifactId>janino</artifactId>
+ <version>2.7.3</version>
</dependency>
<dependency>
<groupId>net.hydromatic</groupId>
- <artifactId>optiq</artifactId>
+ <artifactId>optiq-core</artifactId>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
@@ -139,8 +137,8 @@
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
- <version>4.0.7.Final</version>
- <artifactId>drill-netty-bufferl</artifactId>
+ <version>${project.version}</version>
+ <artifactId>drill-buffers</artifactId>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
@@ -196,7 +194,7 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
- <version>3.1</version>
+ <version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 3e5a156..5083f74 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -18,6 +18,7 @@ import com.google.common.base.Charsets;
import com.google.common.collect.ObjectArrays;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
import org.apache.commons.lang3.ArrayUtils;
@@ -39,6 +40,8 @@ import java.util.Random;
import java.util.List;
import java.io.Closeable;
+import java.io.InputStream;
+import java.io.InputStreamReader;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index c357dd6..2d81299 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -155,6 +155,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
return valueCount;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
<#if (type.width > 8)>
public ${minor.javaType!type.javaType} get(int index) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 127c6fd..4677374 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -243,7 +243,12 @@ package org.apache.drill.exec.vector;
</#if> get(int index, int positionIndex) {
return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
}
+
+ public boolean isNull(int index){
+ return false;
+ }
+
public void get(int index, Repeated${minor.class}Holder holder){
holder.start = offsets.getAccessor().get(index);
holder.end = offsets.getAccessor().get(index+1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
new file mode 100644
index 0000000..efaf9a6
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<#list vv.types as type>
+<#list type.minor as minor>
+<#list ["", "Nullable"] as mode>
+<#assign name = mode + minor.class?cap_first />
+<#assign javaType = (minor.javaType!type.javaType) />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/accessor/${name}Accessor.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.accessor;
+
+<#include "/@includes/vv_imports.ftl" />
+
+@SuppressWarnings("unused")
+public class ${name}Accessor extends AbstractSqlAccessor{
+ <#if mode == "Nullable">
+ private static final MajorType TYPE = Types.optional(MinorType.${minor.class?upper_case});
+ <#else>
+ private static final MajorType TYPE = Types.required(MinorType.${minor.class?upper_case});
+ </#if>
+
+ private final ${name}Vector.Accessor ac;
+
+ public ${name}Accessor(${name}Vector vector){
+ this.ac = vector.getAccessor();
+ }
+
+ public Object getObject(int index){
+ return ac.getObject(index);
+ }
+
+ <#if type.major == "VarLen">
+
+ @Override
+ public InputStream getStream(int index){
+ ${name}Holder h = new ${name}Holder();
+ ac.get(index, h);
+ return new ByteBufInputStream(h.buffer.slice(h.start, h.end));
+ }
+
+ @Override
+ public byte[] getBytes(int index){
+ return ac.get(index);
+ }
+
+ <#switch minor.class>
+ <#case "VarBinary">
+ <#break>
+ <#case "VarChar">
+ @Override
+ public InputStreamReader getReader(int index){
+ return new InputStreamReader(getStream(index), Charsets.UTF_8);
+ }
+
+ @Override
+ public String getString(int index){
+ return new String(getBytes(index), Charsets.UTF_8);
+ }
+
+
+ <#break>
+ <#case "Var16Char">
+ @Override
+ public InputStreamReader getReader(int index){
+ return new InputStreamReader(getStream(index), Charsets.UTF_16);
+ }
+
+ @Override
+ public String getString(int index){
+ return new String(getBytes(index), Charsets.UTF_16);
+ }
+
+
+ <#break>
+ <#default>
+ This is uncompilable code
+ </#switch>
+
+ <#else>
+ @Override
+ public ${javaType} get${javaType?cap_first}(int index){
+ return ac.get(index);
+ }
+ </#if>
+
+ @Override
+ public boolean isNull(int index){
+ return false;
+ }
+
+ @Override
+ MajorType getType(){return TYPE;};
+
+}
+
+
+</#list>
+</#list>
+</#list>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 783f943..8c72cf7 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -23,7 +23,7 @@
package org.apache.drill.exec.expr;
<#include "/@includes/vv_imports.ftl" />
-
+import org.apache.drill.exec.vector.accessor.*;
public class TypeHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
@@ -47,6 +47,25 @@ public class TypeHelper {
throw new UnsupportedOperationException();
}
+ public static SqlAccessor getSqlAccessor(ValueVector vector){
+ switch(vector.getField().getType().getMinorType()){
+ <#list vv.types as type>
+ <#list type.minor as minor>
+ case ${minor.class?upper_case}:
+ switch (vector.getField().getType().getMode()) {
+ case REQUIRED:
+ return new ${minor.class}Accessor((${minor.class}Vector) vector);
+ case OPTIONAL:
+ return new Nullable${minor.class}Accessor((Nullable${minor.class}Vector) vector);
+ case REPEATED:
+ throw new UnsupportedOperationException();
+ }
+ </#list>
+ </#list>
+ }
+ throw new UnsupportedOperationException();
+ }
+
public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
switch (type) {
<#list vv.types as type>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index b059d89..5cd83af 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -225,14 +225,35 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
holder.buffer = data;
}
+
+ <#switch minor.class>
+ <#case "VarChar">
+ public Object getObject(int index) {
+ return new String(get(index), Charsets.UTF_8);
+ }
+ <#break>
+ <#case "Var16Char">
+ public Object getObject(int index) {
+ return new String(get(index), Charsets.UTF_16);
+ }
+ <#break>
+ <#default>
public Object getObject(int index) {
return get(index);
}
+
+ </#switch>
+
+
public int getValueCount() {
return valueCount;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
public UInt${type.width}Vector getOffsetVector(){
return offsetVector;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 2f0ab2e..5f370ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -35,6 +35,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.DuplicateInstanceNameException;
import com.hazelcast.core.Hazelcast;
@@ -132,6 +133,10 @@ public class HazelCache implements DistributedCache {
@Override
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
IMap<String, V> imap = this.instance.getMap(clazz.toString());
+ MapConfig myMapConfig = new MapConfig();
+ myMapConfig.setBackupCount(0);
+ myMapConfig.setReadBackupData(true);
+ instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
return new HCDistributedMapImpl<V>(imap, clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 74eed94..1297cb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Closeable;
@@ -37,11 +35,18 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.rpc.*;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.ChannelClosedException;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
@@ -52,7 +57,7 @@ import com.google.common.util.concurrent.SettableFuture;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
*/
-public class DrillClient implements Closeable{
+public class DrillClient implements Closeable, ConnectionThrottle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
DrillConfig config;
@@ -86,17 +91,28 @@ public class DrillClient implements Closeable{
return config;
}
+ @Override
+ public void setAutoRead(boolean enableAutoRead) {
+ client.setAutoRead(enableAutoRead);
+ }
+
+
+
/**
* Connects the client to a Drillbit server
*
* @throws IOException
*/
- public synchronized void connect() throws RpcException {
+ public void connect() throws RpcException {
+ connect((String) null);
+ }
+
+ public synchronized void connect(String connect) throws RpcException {
if (connected) return;
if (clusterCoordinator == null) {
try {
- this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
this.clusterCoordinator.start(10000);
} catch (Exception e) {
throw new RpcException("Failure setting up ZK for client.", e);
@@ -172,6 +188,11 @@ public class DrillClient implements Closeable{
return listener.getResults();
}
+ public void cancelQuery(QueryId id){
+ client.send(RpcType.CANCEL_QUERY, id, Ack.class);
+ }
+
+
/**
* Submits a Logical plan for direct execution (bypasses parsing)
*
@@ -217,7 +238,7 @@ public class DrillClient implements Closeable{
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
if(result.getHeader().getIsLastChunk()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 7adefdb..b07b3ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -17,43 +17,33 @@
*/
package org.apache.drill.exec.client;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Resources;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.StringUtils;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.RemoteRpcException;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
-import org.apache.drill.exec.vector.ValueVector;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
public class QuerySubmitter {
@@ -99,42 +89,50 @@ public class QuerySubmitter {
public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception {
DrillConfig config = DrillConfig.create();
- DrillClient client;
- if (local) {
- RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- Drillbit[] drillbits = new Drillbit[bits];
- for (int i = 0; i < bits; i++) {
- drillbits[i] = new Drillbit(config, serviceSet);
- drillbits[i].run();
+ DrillClient client = null;
+
+ try{
+ if (local) {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit[] drillbits = new Drillbit[bits];
+ for (int i = 0; i < bits; i++) {
+ drillbits[i] = new Drillbit(config, serviceSet);
+ drillbits[i].run();
+ }
+ client = new DrillClient(config, serviceSet.getCoordinator());
+ } else {
+ ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
+ clusterCoordinator.start(10000);
+ client = new DrillClient(config, clusterCoordinator);
}
- client = new DrillClient(config, serviceSet.getCoordinator());
- } else {
- ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
- clusterCoordinator.start(10000);
- client = new DrillClient(config, clusterCoordinator);
- }
- client.connect();
- QueryResultsListener listener = new QueryResultsListener();
- String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
- UserProtos.QueryType queryType;
- type = type.toLowerCase();
- switch(type) {
- case "logical":
- queryType = UserProtos.QueryType.LOGICAL;
- break;
- case "physical":
- queryType = UserProtos.QueryType.PHYSICAL;
- break;
- default:
- System.out.println("Invalid query type: " + type);
- return -1;
+ client.connect();
+ QueryResultsListener listener = new QueryResultsListener();
+ String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
+ UserProtos.QueryType queryType;
+ type = type.toLowerCase();
+ switch(type) {
+ case "sql":
+ queryType = UserProtos.QueryType.SQL;
+ break;
+ case "logical":
+ queryType = UserProtos.QueryType.LOGICAL;
+ break;
+ case "physical":
+ queryType = UserProtos.QueryType.PHYSICAL;
+ break;
+ default:
+ System.out.println("Invalid query type: " + type);
+ return -1;
+ }
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ client.runQuery(queryType, plan, listener);
+ int rows = listener.await();
+ System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
+ return 0;
+ }finally{
+ if(client != null) client.close();
}
- Stopwatch watch = new Stopwatch();
- watch.start();
- client.runQuery(queryType, plan, listener);
- int rows = listener.await();
- System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
- return 0;
}
private class QueryResultsListener implements UserResultsListener {
@@ -150,7 +148,7 @@ public class QuerySubmitter {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
if (result.getData() != null) {
count.addAndGet(rows);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 904afb5..080679b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -112,10 +112,7 @@ class MergeAdapter extends ClassVisitor {
return null;
}
if(arg3 != null){
- System.out.println("a: " + arg3);
arg3 = arg3.replace(set.precompiled.slash, set.generated.slash);
- System.out.println("b: " + arg3);
-
}
// if( (access & Modifier.PUBLIC) == 0){
// access = access ^ Modifier.PUBLIC ^ Modifier.PROTECTED | Modifier.PRIVATE;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 36433ad..cbd722c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -24,16 +24,18 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.record.NullExpression;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.record.VectorAccessible;
-
public class ExpressionTreeMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
@@ -45,7 +47,12 @@ public class ExpressionTreeMaterializer {
public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) {
LogicalExpression materializedExpr = expr.accept(new MaterializeVisitor(batch, errorCollector), null);
- return ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ LogicalExpression out = ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ if(out instanceof NullExpression){
+ return new TypedNullConstant(Types.optional(MinorType.INT));
+ }else{
+ return out;
+ }
}
private static class MaterializeVisitor extends SimpleExprVisitor<LogicalExpression> {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index fd965a7..80a7819 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import net.hydromatic.optiq.SchemaPlus;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
@@ -75,6 +77,7 @@ public class FragmentContext implements Closeable {
this.connection = connection;
this.fragment = fragment;
this.funcRegistry = funcRegistry;
+ logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
}
@@ -91,6 +94,10 @@ public class FragmentContext implements Closeable {
public DrillbitContext getDrillbitContext() {
return context;
}
+
+ public SchemaPlus getRootSchema(){
+ return null;
+ }
/**
* Get this node's identity.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 256e772..11658e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -21,9 +21,11 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FunctionRegistry;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.sql.DrillSchemaFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -81,4 +83,13 @@ public class QueryContext {
return workBus;
}
+ public DrillSchemaFactory getSchemaFactory(){
+ return drillbitContext.getSchemaFactory();
+ }
+
+ public FunctionRegistry getFunctionRegistry(){
+ return drillbitContext.getFunctionRegistry();
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 6e58ec7..cd59428 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
@@ -33,7 +32,6 @@ import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.logical.data.*;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
@@ -50,6 +48,8 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.store.StorageEngine;
+import org.eigenbase.rel.RelFieldCollation.Direction;
+import org.eigenbase.rel.RelFieldCollation.NullDirection;
import com.beust.jcommander.internal.Lists;
@@ -123,9 +123,9 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
PhysicalOperator input = order.getInput().accept(this, value);
- List<OrderDef> ods = Lists.newArrayList();
+ List<Ordering> ods = Lists.newArrayList();
for(Ordering o : order.getOrderings()){
- ods.add(OrderDef.create(o));
+ ods.add(o);
}
return new SelectionVectorRemover(new Sort(input, ods, false));
}
@@ -150,13 +150,13 @@ public class BasicOptimizer extends Optimizer{
}
// a collapsing aggregate is a currently implemented as a sort followed by a streaming aggregate.
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
List<NamedExpression> keys = Lists.newArrayList();
for(LogicalExpression e : segment.getExprs()){
if( !(e instanceof SchemaPath)) throw new OptimizerException("The basic optimizer doesn't currently support collapsing aggregate where the segment value is something other than a SchemaPath.");
keys.add(new NamedExpression(e, new FieldReference((SchemaPath) e)));
- orderDefs.add(new OrderDef(Direction.ASC, e));
+ orderDefs.add(new Ordering(Direction.Ascending, e, NullDirection.FIRST));
}
Sort sort = new Sort(segment.getInput().accept(this, value), orderDefs, false);
@@ -169,22 +169,22 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException {
PhysicalOperator leftOp = join.getLeft().accept(this, value);
- List<OrderDef> leftOrderDefs = Lists.newArrayList();
+ List<Ordering> leftOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- leftOrderDefs.add(new OrderDef(Direction.ASC, jc.getLeft()));
+ leftOrderDefs.add(new Ordering(Direction.Ascending, jc.getLeft()));
}
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);
PhysicalOperator rightOp = join.getRight().accept(this, value);
- List<OrderDef> rightOrderDefs = Lists.newArrayList();
+ List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- rightOrderDefs.add(new OrderDef(Direction.ASC, jc.getRight()));
+ rightOrderDefs.add(new Ordering(Direction.Ascending, jc.getRight()));
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
- MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJointType());
+ MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
return new SelectionVectorRemover(mjp);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 76916ea..fde88a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -21,13 +21,14 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.sql.SqlJoinOperator.JoinType;
import com.beust.jcommander.internal.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -44,7 +45,7 @@ public class MergeJoinPOP extends AbstractBase{
private final PhysicalOperator left;
private final PhysicalOperator right;
private final List<JoinCondition> conditions;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
@Override
public OperatorCost getCost() {
@@ -56,7 +57,7 @@ public class MergeJoinPOP extends AbstractBase{
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("join-conditions") List<JoinCondition> conditions,
- @JsonProperty("join-type") Join.JoinType joinType
+ @JsonProperty("join-type") JoinRelType joinType
) {
this.left = left;
this.right = right;
@@ -93,7 +94,7 @@ public class MergeJoinPOP extends AbstractBase{
return right;
}
- public Join.JoinType getJoinType() {
+ public JoinRelType getJoinType() {
return joinType;
}
@@ -102,12 +103,12 @@ public class MergeJoinPOP extends AbstractBase{
}
public MergeJoinPOP flipIfRight(){
- if(joinType == JoinType.RIGHT){
+ if(joinType == JoinRelType.RIGHT){
List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size());
for(JoinCondition c : conditions){
flippedConditions.add(c.flip());
}
- return new MergeJoinPOP(right, left, flippedConditions, JoinType.LEFT);
+ return new MergeJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
}else{
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 667cc33..549c65c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -17,18 +17,18 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
-import org.apache.drill.common.expression.LogicalExpression;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractReceiver;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
// The goal of this operator is to produce outgoing batches with records
// ordered according to the supplied expression. Each incoming batch
@@ -39,12 +39,12 @@ public class MergingReceiverPOP extends AbstractReceiver{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class);
private final List<DrillbitEndpoint> senders;
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
@JsonCreator
public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("senders") List<DrillbitEndpoint> senders,
- @JsonProperty("orderings") List<OrderDef> orderings) {
+ @JsonProperty("orderings") List<Ordering> orderings) {
super(oppositeMajorFragmentId);
this.senders = senders;
this.orderings = orderings;
@@ -78,7 +78,7 @@ public class MergingReceiverPOP extends AbstractReceiver{
return new Size(1,1);
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index f0aa85b..c49509f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -17,27 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
@JsonTypeName("ordered-partition-exchange")
public class OrderedPartitionExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private int recordsToSample = 10000; // How many records must be received before analyzing
private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache
@@ -48,7 +48,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
private List<DrillbitEndpoint> receiverLocations;
@JsonCreator
- public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref,
+ public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref,
@JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample,
@JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) {
super(child);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index de5cf04..55632a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -17,24 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("OrderedPartitionSender")
public class OrderedPartitionSender extends AbstractSender {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionSender.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private final List<DrillbitEndpoint> endpoints;
private final int sendingWidth;
@@ -44,7 +47,7 @@ public class OrderedPartitionSender extends AbstractSender {
private float completionFactor;
@JsonCreator
- public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
+ public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
@JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample,
@JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) {
@@ -70,7 +73,7 @@ public class OrderedPartitionSender extends AbstractSender {
return endpoints;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index fbeb6df..b8073c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -18,25 +18,25 @@
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("single-merge-exchange")
public class SingleMergeExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleMergeExchange.class);
- private final List<OrderDef> orderExpr;
+ private final List<Ordering> orderExpr;
// ephemeral for setup tasks
private List<CoordinationProtos.DrillbitEndpoint> senderLocations;
@@ -44,7 +44,7 @@ public class SingleMergeExchange extends AbstractExchange {
@JsonCreator
public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("orderings") List<OrderDef> orderExpr) {
+ @JsonProperty("orderings") List<Ordering> orderExpr) {
super(child);
this.orderExpr = orderExpr;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 206e7cd..82aa830 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.config;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -35,17 +34,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class Sort extends AbstractSingle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private boolean reverse = false;
@JsonCreator
- public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
+ public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
super(child);
this.orderings = orderings;
this.reverse = reverse;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index aae1a3c..7c8a51c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.record.VectorContainer;
+import org.eigenbase.rel.JoinRelType;
/**
* This join template uses a merge join to combine two ordered streams into a single larger batch. When joining
@@ -91,7 +92,7 @@ public abstract class JoinTemplate implements JoinWorker {
// validate input iterators (advancing to the next record batch if necessary)
if (!status.isRightPositionAllowed()) {
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
@@ -109,7 +110,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT)
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 298b031..bd668e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.JoinRelType;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JClass;
@@ -112,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final RecordBatch right;
private final JoinStatus status;
private final JoinCondition condition;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 9428b46..3549a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -17,16 +17,16 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import com.google.common.base.Preconditions;
+import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
+import org.eigenbase.rel.JoinRelType;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
@@ -34,7 +34,7 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
@Override
public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
- if(config.getJoinType() == JoinType.RIGHT){
+ if(config.getJoinType() == JoinRelType.RIGHT){
return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0));
}else{
return new MergeJoinBatch(config, context, children.get(0), children.get(1));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 43d3b45..72f1ad9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,14 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -60,6 +59,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -367,7 +367,7 @@ public class MergingRecordBatch implements RecordBatch {
private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
// set up the expression evaluator and code generation
- final List<OrderDef> orderings = config.getOrderings();
+ final List<Ordering> orderings = config.getOrderings();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<MergingReceiverGeneratorBase> cg =
CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -550,11 +550,11 @@ public class MergingRecordBatch implements RecordBatch {
// generate less than/greater than checks (fixing results for ASCending vs. DESCending)
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? 1 : -1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? 1 : -1));
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? -1 : 1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? -1 : 1));
++comparisonVectorIndex;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index da8978f..381fbe2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,7 +23,6 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -32,6 +31,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.cache.Counter;
@@ -69,6 +69,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -189,7 +190,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
// Uses the
- // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in
+ // the expressions from the Orderings to populate each column. There is one column for each Ordering in
// popConfig.orderings.
VectorContainer containerToCache = new VectorContainer();
@@ -293,11 +294,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
VectorContainer allSamplesContainer = new VectorContainer();
containerBuilder.build(context, allSamplesContainer);
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
- orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp)));
+ orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
// sort the data incoming samples.
@@ -330,8 +331,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
/**
* Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
- * outgoing. Each OrderDef in orderings generates a column, and evaluation of the expression associated with each
- * OrderDef determines the value of each column. These records will later be sorted based on the values in each
+ * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
+ * Ordering determines the value of each column. These records will later be sorted based on the values in each
* column, in the same order as the orderings.
*
* @param sv4
@@ -342,14 +343,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @throws SchemaChangeException
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
- List<OrderDef> orderings) throws SchemaChangeException {
+ List<Ordering> orderings) throws SchemaChangeException {
List<ValueVector> localAllocationVectors = Lists.newArrayList();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
int i = 0;
- for (OrderDef od : orderings) {
+ for (Ordering od : orderings) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
@@ -521,7 +522,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
cg.setMappingSet(mainMapping);
int count = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
if (collector.hasErrors())
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
@@ -538,7 +539,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
ClassGenerator.HoldingContainer out = cg.addExpr(f, false);
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if (od.getDirection() == Order.Direction.ASC) {
+ if (od.getDirection() == Direction.Ascending) {
jc._then()._return(out.getValue());
} else {
jc._then()._return(out.getValue().minus());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3b8154e..509d13b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -88,6 +88,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
return ref;
}
+ private boolean isWildcard(NamedExpression ex){
+ LogicalExpression expr = ex.getExpr();
+ LogicalExpression ref = ex.getRef();
+ if(expr instanceof SchemaPath && ref instanceof SchemaPath){
+ PathSegment e = ((SchemaPath) expr).getRootSegment();
+ PathSegment n = ((SchemaPath) ref).getRootSegment();
+ if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){
+ return true;
+ }
+ }
+ return false;
+ }
@Override
protected void setupNewSchema() throws SchemaChangeException{
@@ -99,33 +111,43 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- for(int i = 0; i < exprs.size(); i++){
- final NamedExpression namedExpression = exprs.get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
- final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
- if(collector.hasErrors()){
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
-
- // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
- ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
- Preconditions.checkNotNull(incoming);
-
- TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ if(exprs.size() == 1 && isWildcard(exprs.get(0))){
+ for(VectorWrapper<?> wrapper : incoming){
+ ValueVector vvIn = wrapper.getValueVector();
+ TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName()));
transfers.add(tp);
container.add(tp.getTo());
- logger.debug("Added transfer.");
- }else{
- // need to do evaluation.
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
- allocationVectors.add(vector);
- TypedFieldId fid = container.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
- cg.addExpr(write);
- logger.debug("Added eval.");
}
+ }else{
+ for(int i = 0; i < exprs.size(); i++){
+ final NamedExpression namedExpression = exprs.get(i);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
+ final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+ Preconditions.checkNotNull(incoming);
+
+ TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ transfers.add(tp);
+ container.add(tp.getTo());
+ logger.debug("Added transfer.");
+ }else{
+ // need to do evaluation.
+ ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ allocationVectors.add(vector);
+ TypedFieldId fid = container.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
+ cg.addExpr(write);
+ logger.debug("Added eval.");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 4d04735..5fd12c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -20,13 +20,11 @@ package org.apache.drill.exec.physical.impl.sort;
import java.io.IOException;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -46,6 +44,7 @@ import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JConditional;
@@ -165,20 +164,20 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return createNewSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
throws ClassTransformationException, IOException, SchemaChangeException{
CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(OrderDef od : orderings){
+ for(Ordering od : orderings){
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
@@ -194,7 +193,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
HoldingContainer out = g.addExpr(f, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASC){
+ if(od.getDirection() == Direction.Ascending){
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
|