beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ming...@apache.org
Subject [53/66] [abbrv] beam git commit: take SerializableFunction as UDF.
Date Mon, 11 Sep 2017 20:19:36 GMT
take SerializableFunction as UDF.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f99bf6c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f99bf6c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f99bf6c

Branch: refs/heads/master
Commit: 2f99bf6ce56ff03298808e8fa6f97639ea4710b7
Parents: b05cf8b
Author: mingmxu <mingmxu@ebay.com>
Authored: Tue Aug 8 23:02:32 2017 -0700
Committer: mingmxu <mingmxu@ebay.com>
Committed: Mon Sep 11 10:56:57 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/extensions/sql/BeamSql.java    | 17 +++++++++++++++++
 .../apache/beam/sdk/extensions/sql/BeamSqlEnv.java |  9 +++++++++
 .../interpreter/operator/BeamSqlUdfExpression.java |  4 +++-
 .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java  | 13 ++++++++++++-
 4 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2f99bf6c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index ac617ad..a1e9877 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -144,6 +145,14 @@ public class BeamSql {
        getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
+     /**
+      * register {@link SerializableFunction} as a UDF function used in this query.
+      * Note, {@link SerializableFunction} must have a constructor without arguments.
+      */
+      public QueryTransform withUdf(String functionName, SerializableFunction sfn){
+        getSqlEnv().registerUdf(functionName, sfn);
+        return this;
+      }
 
      /**
       * register a UDAF function used in this query.
@@ -213,6 +222,14 @@ public class BeamSql {
        getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
+     /**
+      * register {@link SerializableFunction} as a UDF function used in this query.
+      * Note, {@link SerializableFunction} must have a constructor without arguments.
+      */
+      public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
+        getSqlEnv().registerUdf(functionName, sfn);
+        return this;
+      }
 
      /**
       * register a UDAF function used in this query.

http://git-wip-us.apache.org/repos/asf/beam/blob/2f99bf6c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index 4d21425..0737c49 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.rel.type.RelDataType;
@@ -60,6 +61,14 @@ public class BeamSqlEnv implements Serializable{
   }
 
   /**
+   * register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
+   * Note, {@link SerializableFunction} must have a constructor without arguments.
+   */
+  public void registerUdf(String functionName, SerializableFunction sfn) {
+    schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
+  }
+
+  /**
    * Register a UDAF function which can be used in GROUP-BY expression.
    * See {@link BeamSqlUdaf} on how to implement a UDAF.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/2f99bf6c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index f1bcb66..123e6a0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 public class BeamSqlUdfExpression extends BeamSqlExpression {
   //as Method is not Serializable, need to keep class/method information, and rebuild it.
   private transient Method method;
+  private transient Object udfIns;
   private String className;
   private String methodName;
   private List<String> paraClassName = new ArrayList<>();
@@ -63,7 +64,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
       }
 
       return BeamSqlPrimitive.of(getOutputType(),
-          method.invoke(null, paras.toArray(new Object[]{})));
+          method.invoke(udfIns, paras.toArray(new Object[]{})));
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -78,6 +79,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
       for (String pc : paraClassName) {
         paraClass.add(Class.forName(pc));
       }
+      udfIns = Class.forName(className).newInstance();
       method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[]
{}));
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/2f99bf6c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 7302376..0552cbf 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -82,7 +83,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     PCollection<BeamRecord> result2 =
         PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
         .apply("testUdf2",
-            BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
+            BeamSql.query(sql2).withUdf("cubic2", new CubicIntegerFn()));
     PAssert.that(result2).containsInAnyOrder(record);
 
     pipeline.run().waitUntilFinish();
@@ -131,4 +132,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
       return input * input * input;
     }
   }
+
+  /**
+   * A example UDF with {@link SerializableFunction}.
+   */
+  public static class CubicIntegerFn implements SerializableFunction<Integer, Integer>
{
+    @Override
+    public Integer apply(Integer input) {
+      return input * input * input;
+    }
+  }
 }


Mime
View raw message