beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2161] Add support for String operators
Date Tue, 09 May 2017 12:18:00 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 4ea38d823 -> 3e678a75a


[BEAM-2161] Add support for String operators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b12c7b49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b12c7b49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b12c7b49

Branch: refs/heads/DSL_SQL
Commit: b12c7b49b8eab7b208f2701c412123635813ee28
Parents: 4ea38d8
Author: James Xu <xumingmingv@gmail.com>
Authored: Sat May 6 01:20:54 2017 +0800
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue May 9 14:17:06 2017 +0200

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java |  33 +++++
 .../interpreter/operator/BeamSqlExpression.java |  12 ++
 .../interpreter/operator/BeamSqlPrimitive.java  |   4 +-
 .../string/BeamSqlCharLengthExpression.java     |  40 ++++++
 .../string/BeamSqlConcatExpression.java         |  63 +++++++++
 .../string/BeamSqlInitCapExpression.java        |  56 ++++++++
 .../operator/string/BeamSqlLowerExpression.java |  40 ++++++
 .../string/BeamSqlOverlayExpression.java        |  77 +++++++++++
 .../string/BeamSqlPositionExpression.java       |  73 +++++++++++
 .../string/BeamSqlStringUnaryExpression.java    |  45 +++++++
 .../string/BeamSqlSubstringExpression.java      |  83 ++++++++++++
 .../operator/string/BeamSqlTrimExpression.java  | 101 ++++++++++++++
 .../operator/string/BeamSqlUpperExpression.java |  40 ++++++
 .../operator/string/package-info.java           |  22 ++++
 .../sql/interpreter/BeamSQLFnExecutorTest.java  | 131 ++++++++++++++++++-
 .../string/BeamSqlCharLengthExpressionTest.java |  45 +++++++
 .../string/BeamSqlConcatExpressionTest.java     |  67 ++++++++++
 .../string/BeamSqlInitCapExpressionTest.java    |  55 ++++++++
 .../string/BeamSqlLowerExpressionTest.java      |  45 +++++++
 .../string/BeamSqlOverlayExpressionTest.java    |  88 +++++++++++++
 .../string/BeamSqlPositionExpressionTest.java   |  85 ++++++++++++
 .../BeamSqlStringUnaryExpressionTest.java       |  53 ++++++++
 .../string/BeamSqlSubstringExpressionTest.java  | 102 +++++++++++++++
 .../string/BeamSqlTrimExpressionTest.java       | 102 +++++++++++++++
 .../string/BeamSqlUpperExpressionTest.java      |  45 +++++++
 25 files changed, 1501 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index 78663f8..a14d347 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.interpreter;
 
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
@@ -38,6 +39,15 @@ import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpr
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -82,6 +92,7 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
   static BeamSqlExpression buildExpression(RexNode rexNode) {
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
+
       return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
@@ -124,6 +135,28 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
         case "MOD":
           return new BeamSqlModExpression(subExps);
 
+        // string operators
+        case "||":
+          return new BeamSqlConcatExpression(subExps);
+        case "POSITION":
+          return new BeamSqlPositionExpression(subExps);
+        case "CHAR_LENGTH":
+        case "CHARACTER_LENGTH":
+          return new BeamSqlCharLengthExpression(subExps);
+        case "UPPER":
+          return new BeamSqlUpperExpression(subExps);
+        case "LOWER":
+          return new BeamSqlLowerExpression(subExps);
+        case "TRIM":
+          return new BeamSqlTrimExpression(subExps);
+        case "SUBSTRING":
+          return new BeamSqlSubstringExpression(subExps);
+        case "OVERLAY":
+          return new BeamSqlOverlayExpression(subExps);
+        case "INITCAP":
+          return new BeamSqlInitCapExpression(subExps);
+
+
         case "IS NULL":
           return new BeamSqlIsNullExpression(subExps.get(0));
         case "IS NOT NULL":

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index c44795f..54289e6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -41,6 +41,18 @@ public abstract class BeamSqlExpression implements Serializable{
     this.outputType = outputType;
   }
 
+  public BeamSqlExpression op(int idx) {
+    return operands.get(idx);
+  }
+
+  public SqlTypeName opType(int idx) {
+    return op(idx).getOutputType();
+  }
+
+  public <T> T opValueEvaluated(int idx, BeamSQLRow row) {
+    return (T) op(idx).evaluate(row).getValue();
+  }
+
   /**
    * assertion to make sure the input and output are supported in this expression.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index 71852ff..dbe6c3c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
 
 /**
  * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
@@ -86,9 +87,8 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
     case BOOLEAN:
       return value instanceof Boolean;
     case CHAR:
-      return value instanceof Character;
     case VARCHAR:
-      return value instanceof String;
+      return value instanceof String || value instanceof NlsString;
     default:
       throw new BeamSqlUnsupportedException(outputType.name());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
new file mode 100644
index 0000000..7dbd7f1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'CHAR_LENGTH' operator.
+ */
+public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.INTEGER);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
new file mode 100644
index 0000000..a56e9b1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String concat operator.
+ */
+public class BeamSqlConcatExpression extends BeamSqlExpression {
+
+  protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression exp : getOperands()) {
+      if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String left = opValueEvaluated(0, inputRecord);
+    String right = opValueEvaluated(1, inputRecord);
+
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+        new StringBuilder(left.length() + right.length())
+            .append(left).append(right).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
new file mode 100644
index 0000000..3d0125f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'INITCAP' operator.
+ */
+public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+
+    StringBuilder ret = new StringBuilder(str);
+    boolean isInit = true;
+    for (int i = 0; i < str.length(); i++) {
+      if (Character.isWhitespace(str.charAt(i))) {
+        isInit = true;
+        continue;
+      }
+
+      if (isInit) {
+        ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
+        isInit = false;
+      } else {
+        ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
new file mode 100644
index 0000000..1855c65
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'LOWER' operator.
+ */
+public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
new file mode 100644
index 0000000..73f2591
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'OVERLAY' operator.
+ *
+ * <p>
+ *   OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
+ * </p>
+ */
+public class BeamSqlOverlayExpression extends BeamSqlExpression {
+  public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() < 3 || operands.size() > 4) {
+      return false;
+    }
+
+    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+        || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+        || !SqlTypeName.INT_TYPES.contains(opType(2))) {
+      return false;
+    }
+
+    if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+    String replaceStr = opValueEvaluated(1, inputRecord);
+    int idx = opValueEvaluated(2, inputRecord);
+    // the index is 1 based.
+    idx -= 1;
+    int length = replaceStr.length();
+    if (operands.size() == 4) {
+      length = opValueEvaluated(3, inputRecord);
+    }
+
+    StringBuilder result = new StringBuilder(
+        str.length() + replaceStr.length() - length);
+    result.append(str.substring(0, idx))
+        .append(replaceStr)
+        .append(str.substring(idx + length));
+
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
new file mode 100644
index 0000000..a5e8400
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String position operator.
+ *
+ * <p>
+ *   example:
+ *     POSITION(string1 IN string2)
+ *     POSITION(string1 IN string2 FROM integer)
+ * </p>
+ */
+public class BeamSqlPositionExpression extends BeamSqlExpression {
+  public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.INTEGER);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() < 2 || operands.size() > 3) {
+      return false;
+    }
+
+    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+        || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
+      return false;
+    }
+
+    if (operands.size() == 3
+        && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String targetStr = opValueEvaluated(0, inputRecord);
+    String containingStr = opValueEvaluated(1, inputRecord);
+    int from = -1;
+    if (operands.size() == 3) {
+      Number tmp = opValueEvaluated(2, inputRecord);
+      from = tmp.intValue();
+    }
+
+    int idx = containingStr.indexOf(targetStr, from);
+
+    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
new file mode 100644
index 0000000..d931db9
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all string unary operators.
+ */
+public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
+  public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 1) {
+      return false;
+    }
+
+    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
new file mode 100644
index 0000000..554a3fc
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'SUBSTRING' operator.
+ *
+ * <p>
+ *   SUBSTRING(string FROM integer)
+ *   SUBSTRING(string FROM integer FOR integer)
+ * </p>
+ */
+public class BeamSqlSubstringExpression extends BeamSqlExpression {
+  public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() < 2 || operands.size() > 3) {
+      return false;
+    }
+
+    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+        || !SqlTypeName.INT_TYPES.contains(opType(1))) {
+      return false;
+    }
+
+    if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+    int idx = opValueEvaluated(1, inputRecord);
+    int startIdx = idx;
+    if (startIdx > 0) {
+      // NOTE: SQL substring is 1 based(rather than 0 based)
+      startIdx -= 1;
+    } else if (startIdx < 0) {
+      // NOTE: SQL also support negative index...
+      startIdx += str.length();
+    } else {
+      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
+    }
+
+    if (operands.size() == 3) {
+      int length = opValueEvaluated(2, inputRecord);
+      if (length < 0) {
+        length = 0;
+      }
+      int endIdx = Math.min(startIdx + length, str.length());
+      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
+    } else {
+      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
new file mode 100644
index 0000000..d6cad74
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Trim operator.
+ *
+ * <p>
+ * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
+ * </p>
+ */
+public class BeamSqlTrimExpression extends BeamSqlExpression {
+  public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 1 && operands.size() != 3) {
+      return false;
+    }
+
+    if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+      return false;
+    }
+
+    if (operands.size() == 3
+        && (
+        !SqlTypeName.CHAR_TYPES.contains(opType(0))
+            || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+            || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
+        ) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    if (operands.size() == 1) {
+      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+          opValueEvaluated(0, inputRecord).toString().trim());
+    } else {
+      String type = opValueEvaluated(0, inputRecord);
+      String targetStr = opValueEvaluated(1, inputRecord);
+      String containingStr = opValueEvaluated(2, inputRecord);
+
+      switch (type) {
+        case "LEADING":
+          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
+        case "TRAILING":
+          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
+        case "BOTH":
+        default:
+          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+              trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
+      }
+    }
+  }
+
+  static String leadingTrim(String containingStr, String targetStr) {
+    int idx = 0;
+    while (containingStr.startsWith(targetStr, idx)) {
+      idx += targetStr.length();
+    }
+
+    return containingStr.substring(idx);
+  }
+
+  static String trailingTrim(String containingStr, String targetStr) {
+    int idx = containingStr.length() - targetStr.length();
+    while (containingStr.startsWith(targetStr, idx)) {
+      idx -= targetStr.length();
+    }
+
+    idx += targetStr.length();
+    return containingStr.substring(0, idx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
new file mode 100644
index 0000000..d58a283
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'UPPER' operator.
+ */
+public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    String str = opValueEvaluated(0, inputRecord);
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
new file mode 100644
index 0000000..f2c63f3
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * String operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.string;

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
index 8df0865..d7379fc 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
@@ -33,6 +33,15 @@ import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpr
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -121,14 +130,128 @@ public class BeamSQLFnExecutorTest extends BeamSQLFnExecutorTestBase {
       Class<? extends BeamSqlExpression> clazz) {
     RexNode rexNode;
     BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(fn,
+    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+    ));
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+
+    assertTrue(exp.getClass().equals(clazz));
+  }
+
+  public void testBuildExpression_string()  {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
         Arrays.asList(
-            rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
-            rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+            rexBuilder.makeLiteral("hello "),
+            rexBuilder.makeLiteral("world")
         )
     );
     exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlConcatExpression);
 
-    assertTrue(exp.getClass().equals(clazz));
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlUpperExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+        Arrays.asList(
+            rexBuilder.makeLiteral("BOTH"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlTrimExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
new file mode 100644
index 0000000..cd02fdf
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCharLengthExpression.
+ */
+public class BeamSqlCharLengthExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertEquals(5,
+        new BeamSqlCharLengthExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
new file mode 100644
index 0000000..ca71dec
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlConcatExpression.
+ */
+public class BeamSqlConcatExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertTrue(new BeamSqlConcatExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlConcatExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlConcatExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
+    assertEquals("hello world",
+        new BeamSqlConcatExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
new file mode 100644
index 0000000..b38b033
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlInitCapExpression.
+ */
+public class BeamSqlInitCapExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
+    assertEquals("Hello World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
+    assertEquals("Hello World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     world"));
+    assertEquals("Hello     World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
new file mode 100644
index 0000000..fead9dc
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlLowerExpression.
+ */
+public class BeamSqlLowerExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
+    assertEquals("hello",
+        new BeamSqlLowerExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
new file mode 100644
index 0000000..3c4bca5
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlOverlayExpression.
+ */
+public class BeamSqlOverlayExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    assertEquals("w3resou33rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
+    assertEquals("w3resouce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
new file mode 100644
index 0000000..7339466
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlPositionExpression.
+ */
+public class BeamSqlPositionExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
new file mode 100644
index 0000000..9bb553f
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlStringUnaryExpression.
+ */
+public class BeamSqlStringUnaryExpressionTest {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertTrue(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
new file mode 100644
index 0000000..78b2731
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlSubstringExpression.
+ */
+public class BeamSqlSubstringExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals("he",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("o",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
new file mode 100644
index 0000000..8ad33c9
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlTrimExpression.
+ */
+public class BeamSqlTrimExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
+    assertTrue(new BeamSqlTrimExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "LEADING"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertTrue(new BeamSqlTrimExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertFalse(new BeamSqlTrimExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "LEADING"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertEquals("__hehe",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "TRAILING"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertEquals("hehe__",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "BOTH"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
+    assertEquals("__",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
+    assertEquals("hello",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void leadingTrim() throws Exception {
+    assertEquals("__hehe",
+        BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he"));
+  }
+
+  @Test public void trailingTrim() throws Exception {
+    assertEquals("hehe__",
+        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"));
+  }
+
+  @Test public void trim() throws Exception {
+    assertEquals("__",
+        BeamSqlTrimExpression.leadingTrim(
+        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he"
+        ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b12c7b49/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
new file mode 100644
index 0000000..e6f3500
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlUpperExpression.
+ */
+public class BeamSqlUpperExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertEquals("HELLO",
+        new BeamSqlUpperExpression(operands).evaluate(record).getValue());
+  }
+
+}


Mime
View raw message