storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2148 [Storm SQL] Trident mode: back to code generate and compile Trident topology
Date Fri, 03 Feb 2017 01:02:13 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 85ac6b82b -> 02ab70c92


STORM-2148 [Storm SQL] Trident mode: back to code generate and compile Trident topology

* Change RexNodeToBlockStatementCompiler (also rename)
  * For standalone mode, it still returns BlockStatement which contains code block
  * For Trident mode, it returns String which is a "class" implementation
* Replace evaluation of code block to instance method call
* Package compiled classes to be included to jar


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6665cce2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6665cce2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6665cce2

Branch: refs/heads/1.x-branch
Commit: 6665cce2fc5cdc4dbe73d0df4b5555d254462c91
Parents: 85ac6b8
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Mon Oct 24 21:27:07 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Feb 3 10:01:25 2017 +0900

----------------------------------------------------------------------
 .../storm/sql/AbstractTridentProcessor.java     |  58 +++++
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  25 +-
 .../RexNodeToBlockStatementCompiler.java        | 107 ---------
 .../sql/compiler/RexNodeToJavaCodeCompiler.java | 231 +++++++++++++++++++
 .../backends/standalone/RelNodeCompiler.java    |  13 +-
 .../apache/storm/sql/planner/StormRelUtils.java |   6 +
 .../storm/sql/planner/trident/QueryPlanner.java |   9 +-
 .../sql/planner/trident/TridentPlanCreator.java |  49 +++-
 .../sql/planner/trident/rel/TridentCalcRel.java |  19 +-
 .../planner/trident/rel/TridentFilterRel.java   |   9 +-
 .../planner/trident/rel/TridentProjectRel.java  |   9 +-
 .../backends/trident/TestPlanCompiler.java      |   9 +-
 .../calcite/DebuggableExecutableExpression.java |  45 ++++
 .../runtime/calcite/ExecutableExpression.java   |  31 +++
 .../trident/AbstractTridentProcessor.java       |  48 ----
 .../trident/functions/EvaluationCalc.java       |  70 ++----
 .../trident/functions/EvaluationFilter.java     |  39 +---
 .../trident/functions/EvaluationFunction.java   |  40 +---
 18 files changed, 520 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
new file mode 100644
index 0000000..6af71d4
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.DataContext;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractTridentProcessor {
+  protected Stream outputStream;
+  protected DataContext dataContext;
+  protected List<CompilingClassLoader> classLoaders;
+  /**
+   * @return the output stream of the SQL
+   */
+  public Stream outputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Construct the trident topology based on the SQL.
+   */
+  public abstract TridentTopology build();
+
+  /**
+   * @return DataContext instance which is used with execution of query
+   */
+  public DataContext getDataContext() {
+    return dataContext;
+  }
+
+  /**
+   * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
+   */
+  public List<CompilingClassLoader> getClassLoaders() {
+    return classLoaders;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index f7bd719..b780239 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -40,6 +40,7 @@ import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
 import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateFunction;
@@ -53,10 +54,10 @@ import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesRegistry;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
 import org.apache.storm.trident.TridentTopology;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -70,6 +71,7 @@ import java.util.Map;
 import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
 
 import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
 
@@ -124,13 +126,13 @@ class StormSqlImpl extends StormSql {
 
         Path jarPath = null;
         try {
-          // QueryPlanner on Trident mode configures the topology without any new classes, so we don't need to add anything into topology jar
-          // packaging empty jar since topology jar is needed for topology submission
+          // QueryPlanner on Trident mode configures the topology with compiled classes,
+          // so we need to add new classes into topology jar
           // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
 
           jarPath = Files.createTempFile("storm-sql", ".jar");
           System.setProperty("storm.jar", jarPath.toString());
-          packageEmptyTopology(jarPath);
+          packageTopology(jarPath, processor);
           StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
         } finally {
           if (jarPath != null) {
@@ -175,12 +177,23 @@ class StormSqlImpl extends StormSql {
     }
   }
 
-  private void packageEmptyTopology(Path jar) throws IOException {
+  private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
     Manifest manifest = new Manifest();
     Attributes attr = manifest.getMainAttributes();
     attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
+    attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
     try (JarOutputStream out = new JarOutputStream(
-        new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
+            new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
+      List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
+      if (classLoaders != null && !classLoaders.isEmpty()) {
+        for (CompilingClassLoader classLoader : classLoaders) {
+          for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
+            out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
+            out.write(e.getValue().toByteArray());
+            out.closeEntry();
+          }
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
deleted file mode 100644
index 06ed227..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.adapter.enumerable.JavaRowFormat;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.BlockBuilder;
-import org.apache.calcite.linq4j.tree.BlockStatement;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.ParameterExpression;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.util.BuiltInMethod;
-import org.apache.calcite.util.Pair;
-
-import java.util.List;
-
-/**
- * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to expression ({@link org.apache.calcite.linq4j.tree.Expression}).
- *
- * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
- * we need to pass the source code to EvaluationFilter or EvaluationFunction so that they can be serialized, and
- * compiled and executed on worker.
- */
-public class RexNodeToBlockStatementCompiler {
-  private final RexBuilder rexBuilder;
-
-  public RexNodeToBlockStatementCompiler(RexBuilder rexBuilder) {
-    this.rexBuilder = rexBuilder;
-  }
-
-  public BlockStatement compile(List<RexNode> nodes, RelDataType inputRowType) {
-    final RexProgramBuilder programBuilder =
-        new RexProgramBuilder(inputRowType, rexBuilder);
-    for (RexNode node : nodes) {
-      programBuilder.addProject(node, null);
-    }
-
-    return compile(programBuilder.getProgram());
-  }
-
-  public BlockStatement compile(final RexProgram program) {
-    RelDataType inputRowType = program.getInputRowType();
-
-    final BlockBuilder builder = new BlockBuilder();
-    final ParameterExpression context_ =
-            Expressions.parameter(Context.class, "context");
-    final ParameterExpression outputValues_ =
-        Expressions.parameter(Object[].class, "outputValues");
-    final JavaTypeFactoryImpl javaTypeFactory =
-        new StormSqlTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
-
-    final RexToLixTranslator.InputGetter inputGetter =
-            new RexToLixTranslator.InputGetterImpl(
-                    ImmutableList.of(
-                            Pair.<Expression, PhysType>of(
-                                    Expressions.field(context_,
-                                            BuiltInMethod.CONTEXT_VALUES.field),
-                                    PhysTypeImpl.of(javaTypeFactory, inputRowType,
-                                            JavaRowFormat.ARRAY, false))));
-    final Function1<String, RexToLixTranslator.InputGetter> correlates =
-            new Function1<String, RexToLixTranslator.InputGetter>() {
-              public RexToLixTranslator.InputGetter apply(String a0) {
-                throw new UnsupportedOperationException();
-              }
-            };
-    final Expression root =
-            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
-    final List<Expression> list =
-            RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
-                    null, root, inputGetter, correlates);
-    for (int i = 0; i < list.size(); i++) {
-      builder.add(
-              Expressions.statement(
-                      Expressions.assign(
-                              Expressions.arrayIndex(outputValues_,
-                                      Expressions.constant(i)),
-                              list.get(i))));
-    }
-
-    return builder.toBlock();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
new file mode 100644
index 0000000..5ac95e0
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
+ *
+ * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
+ * we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
+ */
+public class RexNodeToJavaCodeCompiler {
+  private final RexBuilder rexBuilder;
+
+  public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  public BlockStatement compileToBlock(List<RexNode> nodes, RelDataType inputRowType) {
+    final RexProgramBuilder programBuilder =
+        new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+
+    return compileToBlock(programBuilder.getProgram());
+  }
+
+  public BlockStatement compileToBlock(final RexProgram program) {
+    final ParameterExpression context_ =
+            Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+            Expressions.parameter(Object[].class, "outputValues");
+
+    return compileToBlock(program, context_, outputValues_).toBlock();
+  }
+
+  public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
+    final RexProgramBuilder programBuilder =
+            new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+
+    return compile(programBuilder.getProgram(), className);
+  }
+
+  public String compile(final RexProgram program, String className) {
+    final ParameterExpression context_ =
+            Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+            Expressions.parameter(Object[].class, "outputValues");
+
+    BlockBuilder builder = compileToBlock(program, context_, outputValues_);
+    return baz(context_, outputValues_, builder.toBlock(), className);
+  }
+
+  private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
+                                        ParameterExpression outputValues_) {
+    RelDataType inputRowType = program.getInputRowType();
+    final BlockBuilder builder = new BlockBuilder();
+    final JavaTypeFactoryImpl javaTypeFactory =
+            new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    final RexToLixTranslator.InputGetter inputGetter =
+            new RexToLixTranslator.InputGetterImpl(
+                    ImmutableList.of(
+                            Pair.<Expression, PhysType>of(
+                                    Expressions.field(context_,
+                                            BuiltInMethod.CONTEXT_VALUES.field),
+                                    PhysTypeImpl.of(javaTypeFactory, inputRowType,
+                                            JavaRowFormat.ARRAY, false))));
+    final Function1<String, RexToLixTranslator.InputGetter> correlates =
+            new Function1<String, RexToLixTranslator.InputGetter>() {
+              public RexToLixTranslator.InputGetter apply(String a0) {
+                throw new UnsupportedOperationException();
+              }
+            };
+    final Expression root =
+            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+    final List<Expression> list =
+            RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+                    null, root, inputGetter, correlates);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(
+              Expressions.statement(
+                      Expressions.assign(
+                              Expressions.arrayIndex(outputValues_,
+                                      Expressions.constant(i)),
+                              list.get(i))));
+    }
+
+    return builder;
+  }
+
+  /** Given a method that implements {@link ExecutableExpression#execute(Context, Object[])},
+   * adds a bridge method that implements {@link ExecutableExpression#execute(Context)}, and
+   * compiles. */
+  static String baz(ParameterExpression context_,
+                    ParameterExpression outputValues_, BlockStatement block, String className) {
+    final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+    // public void execute(Context, Object[] outputValues)
+    declarations.add(
+            Expressions.methodDecl(Modifier.PUBLIC, void.class,
+                    StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+                    ImmutableList.of(context_, outputValues_), block));
+
+    // public Object execute(Context)
+    final BlockBuilder builder = new BlockBuilder();
+    final Expression values_ = builder.append("values",
+            Expressions.newArrayBounds(Object.class, 1,
+                    Expressions.constant(1)));
+    builder.add(
+            Expressions.statement(
+                    Expressions.call(
+                            Expressions.parameter(ExecutableExpression.class, "this"),
+                            StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
+    builder.add(
+            Expressions.return_(null,
+                    Expressions.arrayIndex(values_, Expressions.constant(0))));
+    declarations.add(
+            Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+                    StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
+                    ImmutableList.of(context_), builder.toBlock()));
+
+    final ClassDeclaration classDeclaration =
+            Expressions.classDecl(Modifier.PUBLIC, className, null,
+                    ImmutableList.<Type>of(ExecutableExpression.class), declarations);
+
+    return Expressions.toString(Lists.newArrayList(classDeclaration), "\n", false);
+  }
+
+  enum StormBuiltInMethod {
+    EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
+    EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
+
+    public final Method method;
+    public final Constructor constructor;
+    public final Field field;
+
+    public static final ImmutableMap<Method, BuiltInMethod> MAP;
+
+    static {
+      final ImmutableMap.Builder<Method, BuiltInMethod> builder =
+              ImmutableMap.builder();
+      for (BuiltInMethod value : BuiltInMethod.values()) {
+        if (value.method != null) {
+          builder.put(value.method, value);
+        }
+      }
+      MAP = builder.build();
+    }
+
+    private StormBuiltInMethod(Method method, Constructor constructor, Field field) {
+      this.method = method;
+      this.constructor = constructor;
+      this.field = field;
+    }
+
+    /**
+     * Defines a method.
+     */
+    StormBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+      this(Types.lookupMethod(clazz, methodName, argumentTypes), null, null);
+    }
+
+    /**
+     * Defines a constructor.
+     */
+    StormBuiltInMethod(Class clazz, Class... argumentTypes) {
+      this(null, Types.lookupConstructor(clazz, argumentTypes), null);
+    }
+
+    /**
+     * Defines a field.
+     */
+    StormBuiltInMethod(Class clazz, String fieldName, boolean dummy) {
+      this(null, null, Types.lookupField(clazz, fieldName));
+      assert dummy : "dummy value for method overloading must be true";
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 31f9014..97995c7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -19,7 +19,6 @@ package org.apache.storm.sql.compiler.backends.standalone;
 
 import com.google.common.base.Joiner;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.tree.BlockStatement;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
@@ -37,7 +36,7 @@ import org.apache.calcite.schema.AggregateFunction;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -57,7 +56,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
 
   private final PrintWriter pw;
   private final JavaTypeFactory typeFactory;
-  private final RexNodeToBlockStatementCompiler rexCompiler;
+  private final RexNodeToJavaCodeCompiler rexCompiler;
 
   private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
     "  private static final ChannelHandler %1$s = ",
@@ -201,7 +200,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
     this.typeFactory = typeFactory;
-    this.rexCompiler = new RexNodeToBlockStatementCompiler(new RexBuilder(typeFactory));
+    this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory));
   }
 
   @Override
@@ -221,8 +220,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
     pw.print("context.values = _data.toArray();\n");
     pw.print("Object[] outputValues = new Object[1];\n");
 
-    BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType);
-    pw.write(codeBlock.toString());
+    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
 
     String r = "((Boolean) outputValues[0])";
     if (filter.getCondition().getType().isNullable()) {
@@ -246,8 +244,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
     pw.print("context.values = _data.toArray();\n");
     pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount));
 
-    BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType);
-    pw.write(codeBlock.toString());
+    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
 
     pw.print("    ctx.emit(new Values(outputValues));\n");
     endStage();

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
index 073dc5c..40bbacd 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
@@ -31,11 +31,17 @@ public class StormRelUtils {
     private static final Logger LOG = LoggerFactory.getLogger(StormRelUtils.class);
 
     private static final AtomicInteger sequence = new AtomicInteger(0);
+    private static final AtomicInteger classSequence = new AtomicInteger(0);
 
     public static String getStageName(TridentRel relNode) {
         return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
     }
 
+    public static String getClassName(TridentRel relNode) {
+        return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
+                classSequence.getAndIncrement();
+    }
+
     public static TridentRel getStormRelInput(RelNode input) {
         if (input instanceof RelSubset) {
             // go with known best input

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
index 1045ca2..f98fb02 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
@@ -43,12 +43,13 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.planner.StormRelDataTypeSystem;
 import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
 import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
 import org.apache.storm.sql.planner.trident.rel.TridentRel;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.apache.storm.sql.AbstractTridentProcessor;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.fluent.IAggregatableStream;
@@ -100,6 +101,7 @@ public class QueryPlanner {
         final TridentTopology topology = tridentPlanCreator.getTopology();
         final IAggregatableStream lastStream = tridentPlanCreator.pop();
         final DataContext dc = tridentPlanCreator.getDataContext();
+        final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
 
         return new AbstractTridentProcessor() {
             @Override
@@ -116,6 +118,11 @@ public class QueryPlanner {
             public DataContext getDataContext() {
                 return dc;
             }
+
+            @Override
+            public List<CompilingClassLoader> getClassLoaders() {
+                return cls;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
index ebdd27c..aa30552 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
@@ -19,33 +19,40 @@ package org.apache.storm.sql.planner.trident;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.interpreter.Scalar;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.sql.runtime.calcite.StormDataContext;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.fluent.IAggregatableStream;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 public class TridentPlanCreator {
     private final Map<String, ISqlTridentDataSource> sources;
     private final JavaTypeFactory typeFactory;
-    private final RexNodeToBlockStatementCompiler rexCompiler;
+    private final RexNodeToJavaCodeCompiler rexCompiler;
     private final DataContext dataContext;
     private final TridentTopology topology;
 
     private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
+    private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
 
     public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
         this.sources = sources;
-        this.rexCompiler = new RexNodeToBlockStatementCompiler(rexBuilder);
+        this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
         this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
 
         this.topology = new TridentTopology();
@@ -76,15 +83,43 @@ public class TridentPlanCreator {
         return topology;
     }
 
-    public String createExpression(List<RexNode> nodes, RelDataType inputRowType) {
-        return rexCompiler.compile(nodes, inputRowType).toString();
+    public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
+            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(nodes, inputRowType, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+                getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
     }
 
-    public String createExpression(RexProgram program) {
-        return rexCompiler.compile(program).toString();
+    public ExecutableExpression createScalarInstance(RexProgram program, String className)
+            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(program, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+                getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
     }
 
     private void push(IAggregatableStream stream) {
         streamStack.push(stream);
     }
+
+    public void addClassLoader(CompilingClassLoader compilingClassLoader) {
+        this.classLoaders.add(compilingClassLoader);
+    }
+
+    public ClassLoader getLastClassLoader() {
+        if (this.classLoaders.size() > 0) {
+            return this.classLoaders.get(this.classLoaders.size() - 1);
+        } else {
+            return this.getClass().getClassLoader();
+        }
+    }
+
+    public List<CompilingClassLoader> getClassLoaders() {
+        return classLoaders;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
index da20c42..482e841 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
@@ -29,9 +29,8 @@ import org.apache.calcite.rex.RexProgram;
 import org.apache.storm.sql.planner.StormRelUtils;
 import org.apache.storm.sql.planner.rel.StormCalcRelBase;
 import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
-import org.apache.storm.sql.runtime.trident.functions.ForwardFunction;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.tuple.Fields;
 
@@ -54,41 +53,43 @@ public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
         RelNode input = getInput();
         StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
         Stream inputStream = planCreator.pop().toStream();
-        Fields inputFields = inputStream.getOutputFields();
 
         String stageName = StormRelUtils.getStageName(this);
+
         RelDataType inputRowType = getInput(0).getRowType();
 
         List<String> outputFieldNames = getRowType().getFieldNames();
         int outputCount = outputFieldNames.size();
 
         // filter
+        ExecutableExpression filterInstance = null;
         RexLocalRef condition = program.getCondition();
-        String conditionExpr = null;
         if (condition != null) {
             RexNode conditionNode = program.expandLocalRef(condition);
-            conditionExpr = planCreator.createExpression(Lists.newArrayList(conditionNode), inputRowType);
+            filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
+                    StormRelUtils.getClassName(this));
         }
 
         // projection
+        ExecutableExpression projectionInstance = null;
         List<RexLocalRef> projectList = program.getProjectList();
-        String projectionExpr = null;
         if (projectList != null && !projectList.isEmpty()) {
             List<RexNode> expandedNodes = new ArrayList<>();
             for (RexLocalRef project : projectList) {
                 expandedNodes.add(program.expandLocalRef(project));
             }
 
-            projectionExpr = planCreator.createExpression(expandedNodes, inputRowType);
+            projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
+                    StormRelUtils.getClassName(this));
         }
 
-        if (projectionExpr == null && conditionExpr == null) {
+        if (projectionInstance == null && filterInstance == null) {
             // it shouldn't be happen
             throw new IllegalStateException("Either projection or condition, or both should be provided.");
         }
 
         final Stream finalStream = inputStream
-                .flatMap(new EvaluationCalc(conditionExpr, projectionExpr, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+                .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
                 .name(stageName);
 
         planCreator.addStream(finalStream);

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
index 26aa65e..1fe0927 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.storm.sql.planner.StormRelUtils;
 import org.apache.storm.sql.planner.rel.StormFilterRelBase;
 import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.fluent.IAggregatableStream;
@@ -53,8 +54,12 @@ public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
 
         List<RexNode> childExps = getChildExps();
         RelDataType inputRowType = getInput(0).getRowType();
-        String expression = planCreator.createExpression(childExps, inputRowType);
-        IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(expression, planCreator.getDataContext())).name(stageName);
+
+        String filterClassName = StormRelUtils.getClassName(this);
+        ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
+
+        IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
+                .name(stageName);
         planCreator.addStream(finalStream);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
index 7fe2449..06be5d7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
@@ -26,12 +26,11 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.storm.sql.planner.StormRelUtils;
 import org.apache.storm.sql.planner.rel.StormProjectRelBase;
 import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
-import org.apache.storm.sql.runtime.trident.functions.ForwardFunction;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.tuple.Fields;
 
-import java.util.ArrayList;
 import java.util.List;
 
 public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
@@ -50,19 +49,19 @@ public class TridentProjectRel extends StormProjectRelBase implements TridentRel
         RelNode input = getInput();
         StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
         Stream inputStream = planCreator.pop().toStream();
-        Fields inputFields = inputStream.getOutputFields();
 
         String stageName = StormRelUtils.getStageName(this);
+        String projectionClassName = StormRelUtils.getClassName(this);
 
         List<String> outputFieldNames = getRowType().getFieldNames();
         int outputCount = outputFieldNames.size();
 
         List<RexNode> childExps = getChildExps();
         RelDataType inputRowType = getInput(0).getRowType();
-        final String expression = planCreator.createExpression(childExps, inputRowType);
 
+        ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
         Stream finalStream = inputStream
-                .map(new EvaluationFunction(expression, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+                .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
                 .name(stageName);
 
         planCreator.addStream(finalStream);

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 694f1ba..7b936df 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -27,9 +27,10 @@ import org.apache.storm.ILocalCluster;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.planner.trident.QueryPlanner;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.apache.storm.sql.AbstractTridentProcessor;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
@@ -210,7 +211,11 @@ public class TestPlanCompiler {
     ILocalCluster cluster = new LocalCluster();
     StormTopology stormTopo = topo.build();
     try {
-      Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+      if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
+        CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
+        Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
+      }
+
       cluster.submitTopology("storm-sql", conf, stormTopo);
       waitForCompletion(1000 * 1000, new Callable<Boolean>() {
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
new file mode 100644
index 0000000..e78f354
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+public class DebuggableExecutableExpression implements ExecutableExpression {
+    private ExecutableExpression delegate;
+    private String delegateCode;
+
+    public DebuggableExecutableExpression(ExecutableExpression delegate, String delegateCode) {
+        this.delegate = delegate;
+        this.delegateCode = delegateCode;
+    }
+
+    @Override
+    public Object execute(Context context) {
+        return delegate.execute(context);
+    }
+
+    @Override
+    public void execute(Context context, Object[] results) {
+        delegate.execute(context, results);
+    }
+
+    public String getDelegateCode() {
+        return delegateCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
new file mode 100644
index 0000000..8416945
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+import java.io.Serializable;
+
+/**
+ * Compiled executable expression.
+ */
+public interface ExecutableExpression extends Serializable {
+    Object execute(Context context);
+    void execute(Context context, Object[] results);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
deleted file mode 100644
index 02e301c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.trident;
-
-import org.apache.calcite.DataContext;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-
-import java.util.Map;
-
-public abstract class AbstractTridentProcessor {
-  protected Stream outputStream;
-  protected DataContext dataContext;
-  /**
-   * @return the output stream of the SQL
-   */
-  public Stream outputStream() {
-    return outputStream;
-  }
-
-  /**
-   * Construct the trident topology based on the SQL.
-   */
-  public abstract TridentTopology build();
-
-  /**
-   * @return DataContext instance which is used with execution of query
-   */
-  public DataContext getDataContext() {
-    return dataContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
index 3c7b4e4..6c76481 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
@@ -21,62 +21,40 @@ package org.apache.storm.sql.runtime.trident.functions;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
-import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
-import org.codehaus.commons.compiler.CompileException;
-import org.codehaus.janino.ScriptEvaluator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.Map;
 
 public class EvaluationCalc implements OperationAwareFlatMapFunction {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
 
-    private transient ScriptEvaluator evaluator;
-
-    private final String expression;
+    private final ExecutableExpression filterInstance;
+    private final ExecutableExpression projectionInstance;
     private final Object[] outputValues;
     private final DataContext dataContext;
 
-    public EvaluationCalc(String filterExpression, String projectionExpression, int outputCount, DataContext dataContext) {
-        expression = buildCompleteExpression(filterExpression, projectionExpression);
+    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+        this.filterInstance = filterInstance;
+        this.projectionInstance = projectionInstance;
         this.outputValues = new Object[outputCount];
         this.dataContext = dataContext;
     }
 
-    private String buildCompleteExpression(String filterExpression, String projectionExpression) {
-        StringBuilder sb = new StringBuilder();
-
-        if (filterExpression != null && !filterExpression.isEmpty()) {
-            sb.append(filterExpression);
-            // TODO: Convert this with Linq4j?
-            sb.append("if (outputValues[0] == null || !((Boolean) outputValues[0])) { return 0; }\n\n");
-        }
-
-        if (projectionExpression != null && !projectionExpression.isEmpty()) {
-            sb.append(projectionExpression);
-        }
-        sb.append("\nreturn 1;");
-
-        return sb.toString();
-    }
-
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
-        LOG.info("Expression: {}", expression);
-        try {
-            evaluator = new ScriptEvaluator(expression, int.class,
-                    new String[] {"context", "outputValues"},
-                    new Class[] { Context.class, Object[].class });
-        } catch (CompileException e) {
-            throw new RuntimeException(e);
+        if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+        }
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
         }
     }
 
@@ -87,20 +65,22 @@ public class EvaluationCalc implements OperationAwareFlatMapFunction {
 
     @Override
     public Iterable<Values> execute(TridentTuple input) {
-        try {
-            Context calciteContext = new StormContext(dataContext);
-            calciteContext.values = input.getValues().toArray();
-            int keepFlag = (int) evaluator.evaluate(
-                    new Object[]{calciteContext, outputValues});
-            // script
-            if (keepFlag == 1) {
-                return Collections.singletonList(new Values(outputValues));
-            } else {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.getValues().toArray();
+
+        if (filterInstance != null) {
+            filterInstance.execute(calciteContext, outputValues);
+            // filtered out
+            if (outputValues[0] == null || !((Boolean) outputValues[0])) {
                 return Collections.emptyList();
             }
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
         }
 
+        if (projectionInstance != null) {
+            projectionInstance.execute(calciteContext, outputValues);
+            return Collections.singletonList(new Values(outputValues));
+        } else {
+            return Collections.singletonList(new Values(input.getValues()));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
index f61fdda..9314852 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -21,58 +21,41 @@ package org.apache.storm.sql.runtime.trident.functions;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.tuple.TridentTuple;
-import org.codehaus.commons.compiler.CompileException;
-import org.codehaus.janino.ScriptEvaluator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 public class EvaluationFilter extends BaseFilter {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
 
-    private transient ScriptEvaluator evaluator;
-
-    private final String expression;
+    private final ExecutableExpression filterInstance;
     private final DataContext dataContext;
     private final Object[] outputValues;
 
-    public EvaluationFilter(String expression, DataContext dataContext) {
-        if (!expression.contains("return ")) {
-            // we use out parameter and don't use the return value but compile fails...
-            expression = expression + "\nreturn 0;";
-        }
-
-        this.expression = expression;
+    public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
+        this.filterInstance = filterInstance;
         this.dataContext = dataContext;
         this.outputValues = new Object[1];
     }
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
-        LOG.info("Expression: {}", expression);
-        try {
-            evaluator = new ScriptEvaluator(expression, int.class,
-                    new String[] {"context", "outputValues"},
-                    new Class[] { Context.class, Object[].class });
-        } catch (CompileException e) {
-            throw new RuntimeException(e);
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
         }
     }
 
     @Override
     public boolean isKeep(TridentTuple tuple) {
-        try {
-            Context calciteContext = new StormContext(dataContext);
-            calciteContext.values = tuple.getValues().toArray();
-            evaluator.evaluate(new Object[] {calciteContext, outputValues});
-            return (outputValues[0] != null && (boolean) outputValues[0]);
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
-        }
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = tuple.getValues().toArray();
+        filterInstance.execute(calciteContext, outputValues);
+        return (outputValues[0] != null && (boolean) outputValues[0]);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6665cce2/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
index 8d85990..2608104 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -21,47 +21,34 @@ package org.apache.storm.sql.runtime.trident.functions;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 import org.apache.storm.trident.operation.OperationAwareMapFunction;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
-import org.codehaus.commons.compiler.CompileException;
-import org.codehaus.janino.ScriptEvaluator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 public class EvaluationFunction implements OperationAwareMapFunction {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
 
-    private transient ScriptEvaluator evaluator;
-
-    private final String expression;
+    private final ExecutableExpression projectionInstance;
     private final Object[] outputValues;
     private final DataContext dataContext;
 
-    public EvaluationFunction(String expression, int outputCount, DataContext dataContext) {
-        if (!expression.contains("return ")) {
-            // we use out parameter and don't use the return value but compile fails...
-            expression = expression + "\nreturn 0;";
-        }
-
-        this.expression = expression;
+    public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+        this.projectionInstance = projectionInstance;
         this.outputValues = new Object[outputCount];
         this.dataContext = dataContext;
     }
 
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
-        LOG.info("Expression: {}", expression);
-        try {
-            evaluator = new ScriptEvaluator(expression, int.class,
-                    new String[] {"context", "outputValues"},
-                    new Class[] { Context.class, Object[].class });
-        } catch (CompileException e) {
-            throw new RuntimeException(e);
+        if (projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
         }
     }
 
@@ -72,14 +59,9 @@ public class EvaluationFunction implements OperationAwareMapFunction {
 
     @Override
     public Values execute(TridentTuple input) {
-        try {
-            Context calciteContext = new StormContext(dataContext);
-            calciteContext.values = input.getValues().toArray();
-            evaluator.evaluate(
-                    new Object[]{calciteContext, outputValues});
-            return new Values(outputValues);
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
-        }
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.getValues().toArray();
+        projectionInstance.execute(calciteContext, outputValues);
+        return new Values(outputValues);
     }
 }


Mime
View raw message