beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amaliu...@apache.org
Subject [beam] branch master updated: [BEAM-11374] Add key regex filter to Bigtable for BeamSQL
Date Thu, 10 Dec 2020 21:14:59 GMT
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa1498c  [BEAM-11374] Add key regex filter to Bigtable for BeamSQL
     new d4e9171  Merge pull request #13495 from piotr-szuberski/bigtable-filters
aa1498c is described below

commit aa1498c0d70da32ae730990b9d64d52eed5da084
Author: Piotr Szuberski <piotr.szuberski@polidea.com>
AuthorDate: Mon Dec 7 17:55:38 2020 +0100

    [BEAM-11374] Add key regex filter to Bigtable for BeamSQL
---
 .../sql/meta/provider/bigtable/BigtableFilter.java | 136 +++++++++++++++++++++
 .../sql/meta/provider/bigtable/BigtableTable.java  |  47 +++++--
 .../meta/provider/bigtable/BigtableFilterTest.java | 114 +++++++++++++++++
 .../provider/bigtable/BigtableTableFlatTest.java   |  40 +++++-
 .../apache/beam/sdk/io/gcp/bigtable/RowUtils.java  |  10 ++
 .../beam/sdk/io/gcp/testing/BigtableTestUtils.java |   5 +-
 .../dsls/sql/extensions/create-external-table.md   |   4 +
 7 files changed, 336 insertions(+), 20 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilter.java
new file mode 100644
index 0000000..c3bdda7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;
+import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlKind.LIKE;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.bigtable.v2.RowFilter;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+
+/**
+ * BigtableFilter for queries with WHERE clause.
+ *
+ * <p>Currently only queries with a single LIKE statement by key field with <a
+ * href=https://github.com/google/re2/wiki/Syntax>RE2 Syntax</a> regex type are
supported, e.g.
+ * `SELECT * FROM table WHERE key LIKE '^key\d'`
+ */
+class BigtableFilter implements BeamSqlTableFilter {
+  private final List<RexNode> supported;
+  private final List<RexNode> unsupported;
+  private final Schema schema;
+
+  BigtableFilter(List<RexNode> predicateCNF, Schema schema) {
+    supported = predicateCNF.stream().filter(BigtableFilter::isSupported).collect(toList());
+    unsupported =
+        predicateCNF.stream().filter(predicate -> !isSupported(predicate)).collect(toList());
+    this.schema = schema;
+  }
+
+  @Override
+  public List<RexNode> getNotSupported() {
+    return unsupported;
+  }
+
+  @Override
+  public int numSupported() {
+    return BeamSqlTableFilter.expressionsInFilter(supported);
+  }
+
+  public List<RexNode> getSupported() {
+    return supported;
+  }
+
+  @Override
+  public String toString() {
+    String supStr = supported.stream().map(RexNode::toString).collect(Collectors.joining());
+    String unsupStr = unsupported.stream().map(RexNode::toString).collect(Collectors.joining());
+    return String.format("[supported{%s}, unsupported{%s}]", supStr, unsupStr);
+  }
+
+  RowFilter getFilters() {
+    checkArgument(
+        supported.size() == 1,
+        String.format("Only one LIKE operation is allowed. Got %s operations", supported.size()));
+    return translateRexNodeToRowFilter(supported.get(0));
+  }
+
+  private RowFilter translateRexNodeToRowFilter(RexNode node) {
+    checkNodeIsCoposite(node);
+    checkArgument(LIKE.equals(node.getKind()), "Only LIKE operation is supported.");
+
+    List<RexLiteral> literals = filterOperands((RexCall) node, RexLiteral.class);
+    List<RexInputRef> inputRefs = filterOperands((RexCall) node, RexInputRef.class);
+
+    checkArgument(literals.size() == 1);
+    checkArgument(inputRefs.size() == 1);
+
+    checkFieldIsKey(inputRefs.get(0));
+    String literal = literals.get(0).getValueAs(String.class);
+
+    return RowFilter.newBuilder().setRowKeyRegexFilter(byteStringUtf8(literal)).build();
+  }
+
+  private void checkFieldIsKey(RexInputRef inputRef) {
+    String inputFieldName = schema.getField(inputRef.getIndex()).getName();
+    checkArgument(
+        KEY.equals(inputFieldName),
+        "Only 'key' queries are supported. Got field " + inputFieldName);
+  }
+
+  private static boolean isSupported(RexNode node) {
+    checkNodeIsCoposite(node);
+    if (!LIKE.equals(node.getKind())) {
+      return false;
+    }
+
+    long literalsCount = countOperands((RexCall) node, RexLiteral.class);
+    long fieldsCount = countOperands((RexCall) node, RexInputRef.class);
+
+    return literalsCount == 1 && fieldsCount == 1;
+  }
+
+  private <T extends RexNode> List<T> filterOperands(RexCall compositeNode, Class<T>
clazz) {
+    return compositeNode.getOperands().stream()
+        .filter(clazz::isInstance)
+        .map(clazz::cast)
+        .collect(toList());
+  }
+
+  private static <T extends RexNode> long countOperands(RexCall compositeNode, Class<T>
clazz) {
+    return compositeNode.getOperands().stream().filter(clazz::isInstance).count();
+  }
+
+  private static void checkNodeIsCoposite(RexNode node) {
+    checkArgument(
+        node instanceof RexCall,
+        String.format(
+            "Encountered an unexpected node type: %s. Should be %s",
+            node.getClass().getSimpleName(), RexCall.class.getSimpleName()));
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java
index da27d34..094d47b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTable.java
@@ -34,6 +34,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
@@ -43,10 +44,12 @@ import org.apache.beam.sdk.io.gcp.bigtable.BigtableRowToBeamRow;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableRowToBeamRowFlat;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 
 @Experimental
@@ -95,22 +98,24 @@ public class BigtableTable extends SchemaBaseBeamTable implements Serializable
{
 
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
-    BigtableIO.Read readTransform =
-        BigtableIO.read().withProjectId(projectId).withInstanceId(instanceId).withTableId(tableId);
-    if (!emulatorHost.isEmpty()) {
-      readTransform = readTransform.withEmulator(emulatorHost);
-    }
-    return readTransform
+    return readTransform()
         .expand(begin)
-        .apply(
-            "BigtableRowToBeamRow",
-            useFlatSchema
-                ? new BigtableRowToBeamRowFlat(schema, columnsMapping)
-                : new BigtableRowToBeamRow(schema))
+        .apply("BigtableRowToBeamRow", bigtableRowToRow())
         .setRowSchema(schema);
   }
 
   @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    BigtableIO.Read readTransform = readTransform();
+    if (filters instanceof BigtableFilter) {
+      BigtableFilter bigtableFilter = (BigtableFilter) filters;
+      readTransform = readTransform.withRowFilter(bigtableFilter.getFilters());
+    }
+    return readTransform.expand(begin).apply(bigtableRowToRow());
+  }
+
+  @Override
   public POutput buildIOWriter(PCollection<Row> input) {
     if (!useFlatSchema) {
       throw new UnsupportedOperationException(
@@ -134,6 +139,11 @@ public class BigtableTable extends SchemaBaseBeamTable implements Serializable
{
     return BeamTableStatistics.BOUNDED_UNKNOWN;
   }
 
+  @Override
+  public BeamSqlTableFilter constructFilter(List<RexNode> filter) {
+    return new BigtableFilter(filter, schema);
+  }
+
   private static Map<String, Set<String>> parseColumnsMapping(String commaSeparatedMapping)
{
     Map<String, Set<String>> columnsMapping = new HashMap<>();
     Splitter.on(",")
@@ -208,4 +218,19 @@ public class BigtableTable extends SchemaBaseBeamTable implements Serializable
{
               allMappingQualifiers, schemaFieldNames));
     }
   }
+
+  private BigtableIO.Read readTransform() {
+    BigtableIO.Read readTransform =
+        BigtableIO.read().withProjectId(projectId).withInstanceId(instanceId).withTableId(tableId);
+    if (!emulatorHost.isEmpty()) {
+      readTransform = readTransform.withEmulator(emulatorHost);
+    }
+    return readTransform;
+  }
+
+  private PTransform<PCollection<com.google.bigtable.v2.Row>, PCollection<Row>>
bigtableRowToRow() {
+    return useFlatSchema
+        ? new BigtableRowToBeamRowFlat(schema, columnsMapping)
+        : new BigtableRowToBeamRow(schema);
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java
new file mode 100644
index 0000000..bd34d16
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableFilterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BigtableFilterTest {
+
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder().addStringField(KEY).addStringField("name").build();
+
+  private BeamSqlEnv sqlEnv;
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[][] {
+          {"select * from TEST where key = '100'", false},
+          {"select * from TEST where key >= 'key2'", false},
+          {"select * from TEST where key LIKE '^key[123]'", true},
+          {"select * from TEST where key LIKE '^key[abc]' OR key LIKE '^key[bcd]'", false},
+        });
+  }
+
+  @Parameter public String query;
+
+  @Parameter(1)
+  public boolean isSupported;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void buildUp() {
+    TestTableProvider tableProvider = new TestTableProvider();
+    Table table = getTable("TEST", PushDownOptions.NONE);
+    tableProvider.createTable(table);
+    tableProvider.addRows(table.getName(), row("key1", "firstName"), row("key2", "secondName"));
+
+    sqlEnv =
+        BeamSqlEnv.builder(tableProvider)
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .build();
+  }
+
+  @Test
+  public void testIsSupported() {
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(query);
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    BigtableFilter filter =
+        new BigtableFilter(((BeamCalcRel) beamRelNode).getProgram().split().right, BASIC_SCHEMA);
+
+    assertThat(
+        "Query: '" + query + "' is expected to be " + (isSupported ? "supported." : "unsupported."),
+        filter.getNotSupported().isEmpty() == isSupported);
+  }
+
+  private static Table getTable(String name, PushDownOptions options) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .schema(BASIC_SCHEMA)
+        .properties(
+            JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() +
"\" }"))
+        .type("test")
+        .build();
+  }
+
+  private static Row row(String key, String name) {
+    return Row.withSchema(BASIC_SCHEMA).addValues(key, name).build();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index bbfc62f..b336e43 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
 
 import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.COLUMNS_MAPPING;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
 import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
 import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
 import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
@@ -36,6 +37,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -97,12 +99,12 @@ public class BigtableTableFlatTest extends BigtableTableTest {
 
     String query =
         "SELECT \n"
-            + "  ft.key, \n"
-            + "  ft.boolColumn, \n"
-            + "  ft.longColumn, \n"
-            + "  ft.stringColumn, \n"
-            + "  ft.doubleColumn \n"
-            + "FROM flatTable ft";
+            + "  key, \n"
+            + "  boolColumn, \n"
+            + "  longColumn, \n"
+            + "  stringColumn, \n"
+            + "  doubleColumn \n"
+            + "FROM flatTable";
 
     sqlEnv.parseQuery(query);
     PCollection<Row> queryOutput =
@@ -115,6 +117,24 @@ public class BigtableTableFlatTest extends BigtableTableTest {
   }
 
   @Test
+  public void testSelectFlatKeyRegexQuery() throws Exception {
+    createReadTable("regexTable");
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+    sqlEnv.executeDdl(createFlatTableString("regexTable"));
+
+    String query = "SELECT key FROM regexTable WHERE key LIKE '^key[0134]{1}'";
+
+    sqlEnv.parseQuery(query);
+    PCollection<Row> queryOutput =
+        BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(query));
+
+    assertThat(queryOutput.getSchema(), equalTo(filterSchema()));
+
+    PAssert.that(queryOutput).containsInAnyOrder(filterRow(KEY1));
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
   public void testSimpleInsert() {
     createTable("beamWriteTable");
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
@@ -136,6 +156,14 @@ public class BigtableTableFlatTest extends BigtableTableTest {
     readPipeline.run().waitUntilFinish();
   }
 
+  private Schema filterSchema() {
+    return Schema.builder().addStringField(KEY).build();
+  }
+
+  private Row filterRow(String key) {
+    return Row.withSchema(filterSchema()).attachValues(key);
+  }
+
   private static class ReplaceCellTimestamp
       extends SimpleFunction<com.google.bigtable.v2.Row, com.google.bigtable.v2.Row>
{
     @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/RowUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/RowUtils.java
index d6cd791..a79a432 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/RowUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/RowUtils.java
@@ -17,10 +17,20 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable;
 
+import com.google.protobuf.ByteString;
+
 public class RowUtils {
   public static final String KEY = "key";
   public static final String VALUE = "val";
   public static final String TIMESTAMP_MICROS = "timestampMicros";
   public static final String LABELS = "labels";
   public static final String COLUMNS_MAPPING = "columnsMapping";
+
+  public static ByteString byteString(byte[] bytes) {
+    return ByteString.copyFrom(bytes);
+  }
+
+  public static ByteString byteStringUtf8(String value) {
+    return ByteString.copyFromUtf8(value);
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
index eefc51f..d9796fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
@@ -31,7 +31,6 @@ import com.google.bigtable.v2.Cell;
 import com.google.bigtable.v2.Column;
 import com.google.bigtable.v2.Family;
 import com.google.protobuf.ByteString;
-import java.time.Instant;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -82,8 +81,8 @@ public class BigtableTestUtils {
           .addDoubleField(DOUBLE_COLUMN)
           .build();
 
-  public static final long NOW = Instant.now().toEpochMilli() * 1_000;
-  public static final long LATER = NOW + 1_000;
+  public static final long NOW = 5_000_000_000L;
+  public static final long LATER = NOW + 1_000L;
 
   public static byte[] floatToByteArray(float number) {
     return Ints.toByteArray(Float.floatToIntBits(number));
diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 2182b4b..11d6ec7 100644
--- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -281,6 +281,10 @@ to the key-values pairs specified in `columnsMapping`.
 
 Not all existing column families and qualifiers have to be provided to the schema.
 
+Filters are only allowed by `key` field with single `LIKE` statement with
+[RE2 Syntax](https://github.com/google/re2/wiki/Syntax) regex, e.g.
+`SELECT * FROM table WHERE key LIKE '^key[012]{1}'`
+
 ### Write Mode
 
 Supported for flat schema only.


Mime
View raw message