beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: DatastoreIO: Update datastore API to v1beta3
Date Wed, 22 Jun 2016 16:55:08 GMT
DatastoreIO: Update datastore API to v1beta3


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/104f4dd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/104f4dd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/104f4dd2

Branch: refs/heads/master
Commit: 104f4dd27dae9643f953d964b31a0d7674cbf1e9
Parents: f480944
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Thu Jun 16 13:57:43 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 22 09:54:51 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  15 +-
 .../beam/examples/complete/AutoComplete.java    |  35 +--
 .../examples/cookbook/DatastoreWordCount.java   |  48 ++--
 pom.xml                                         |  44 +--
 runners/google-cloud-dataflow-java/pom.xml      |  11 +-
 .../dataflow/io/DataflowDatastoreIOTest.java    |  15 +-
 sdks/java/core/pom.xml                          |   9 +-
 .../org/apache/beam/sdk/coders/EntityCoder.java |  87 ------
 .../org/apache/beam/sdk/io/DatastoreIO.java     | 286 ++++++++-----------
 .../apache/beam/sdk/coders/EntityCoderTest.java | 110 -------
 .../sdk/coders/protobuf/ProtobufUtilTest.java   |   7 +-
 .../org/apache/beam/sdk/io/DatastoreIOTest.java | 171 +++++------
 .../apache/beam/sdk/util/ApiSurfaceTest.java    |   4 +-
 13 files changed, 279 insertions(+), 563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 5167810..cac9857 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -260,11 +260,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-datastore-protobuf</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
     </dependency>
 
@@ -279,6 +274,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-proto-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-protos</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index ef47762..c6893f4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.examples.complete;
 
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
@@ -55,18 +57,20 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.DatastoreHelper;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
 
 import org.joda.time.Duration;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -396,16 +400,15 @@ public class AutoComplete {
 
       entityBuilder.setKey(key);
       List<Value> candidates = new ArrayList<>();
+      Map<String, Value> properties = new HashMap<>();
       for (CompletionCandidate tag : c.element().getValue()) {
         Entity.Builder tagEntity = Entity.newBuilder();
-        tagEntity.addProperty(
-            DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
-        tagEntity.addProperty(
-            DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
-        candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
+        properties.put("tag", makeValue(tag.value).build());
+        properties.put("count", makeValue(tag.count).build());
+        candidates.add(makeValue(tagEntity).build());
       }
-      entityBuilder.addProperty(
-          DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
+      properties.put("candidates", makeValue(candidates).build());
+      entityBuilder.putAllProperties(properties);
       c.output(entityBuilder.build());
     }
   }
@@ -426,7 +429,7 @@ public class AutoComplete {
     Boolean getRecursive();
     void setRecursive(Boolean value);
 
-    @Description("Dataset entity kind")
+    @Description("Datastore entity kind")
     @Default.String("autocomplete-demo")
     String getKind();
     void setKind(String value);
@@ -441,9 +444,9 @@ public class AutoComplete {
     Boolean getOutputToDatastore();
     void setOutputToDatastore(Boolean value);
 
-    @Description("Datastore output dataset ID, defaults to project ID")
-    String getOutputDataset();
-    void setOutputDataset(String value);
+    @Description("Datastore output project ID, defaults to project ID")
+    String getOutputProject();
+    void setOutputProject(String value);
   }
 
   public static void main(String[] args) throws IOException {
@@ -480,7 +483,7 @@ public class AutoComplete {
       toWrite
       .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
       .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
-          options.getOutputDataset(), options.getProject())));
+          options.getOutputProject(), options.getProject())));
     }
     if (options.getOutputToBigQuery()) {
       dataflowUtils.setupBigQueryTable();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index b0192c9..7578d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -17,11 +17,10 @@
  */
 package org.apache.beam.examples.cookbook;
 
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.getString;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.getString;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
@@ -37,12 +36,11 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Property;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.Value;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.Value;
 
 import java.util.Map;
 import java.util.UUID;
@@ -64,7 +62,6 @@ import javax.annotation.Nullable;
  * <p>To run this pipeline locally, the following options must be provided:
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
- *   --dataset=YOUR_DATASET_ID
  *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
  * }</pre>
  *
@@ -89,7 +86,7 @@ public class DatastoreWordCount {
   static class GetContentFn extends DoFn<Entity, String> {
     @Override
     public void processElement(ProcessContext c) {
-      Map<String, Value> props = getPropertyMap(c.element());
+      Map<String, Value> props = c.element().getProperties();
       Value value = props.get("content");
       if (value != null) {
         c.output(getString(value));
@@ -106,7 +103,7 @@ public class DatastoreWordCount {
   static Key makeAncestorKey(@Nullable String namespace, String kind) {
     Key.Builder keyBuilder = makeKey(kind, "root");
     if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
     }
     return keyBuilder.build();
   }
@@ -136,12 +133,11 @@ public class DatastoreWordCount {
       // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
       // we can simplify this code.
       if (namespace != null) {
-        keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+        keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
       }
 
       entityBuilder.setKey(keyBuilder.build());
-      entityBuilder.addProperty(Property.newBuilder().setName("content")
-          .setValue(Value.newBuilder().setStringValue(content)));
+      entityBuilder.getMutableProperties().put("content", makeValue(content).build());
       return entityBuilder.build();
     }
 
@@ -167,21 +163,21 @@ public class DatastoreWordCount {
     String getOutput();
     void setOutput(String value);
 
-    @Description("Dataset ID to read from datastore")
+    @Description("Project ID to read from datastore")
     @Validation.Required
-    String getDataset();
-    void setDataset(String value);
+    String getProject();
+    void setProject(String value);
 
-    @Description("Dataset entity kind")
+    @Description("Datastore Entity kind")
     @Default.String("shakespeare-demo")
     String getKind();
     void setKind(String value);
 
-    @Description("Dataset namespace")
+    @Description("Datastore Namespace")
     String getNamespace();
     void setNamespace(@Nullable String value);
 
-    @Description("Read an existing dataset, do not write first")
+    @Description("Read an existing project, do not write first")
     boolean isReadOnly();
     void setReadOnly(boolean value);
 
@@ -199,7 +195,7 @@ public class DatastoreWordCount {
       Pipeline p = Pipeline.create(options);
       p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
        .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
-       .apply(DatastoreIO.writeTo(options.getDataset()));
+       .apply(DatastoreIO.writeTo(options.getProject()));
 
       p.run();
   }
@@ -231,7 +227,7 @@ public class DatastoreWordCount {
 
     // For Datastore sources, the read namespace can be set on the entire query.
     DatastoreIO.Source source = DatastoreIO.source()
-        .withDataset(options.getDataset())
+        .withProject(options.getProject())
         .withQuery(query)
         .withNamespace(options.getNamespace());
 
@@ -259,7 +255,7 @@ public class DatastoreWordCount {
       // First example: write data to Datastore for reading later.
       //
       // NOTE: this write does not delete any existing Entities in the Datastore, so if run
-      // multiple times with the same output dataset, there may be duplicate entries. The
+      // multiple times with the same output project, there may be duplicate entries. The
       // Datastore Query tool in the Google Developers Console can be used to inspect or erase all
       // entries with a particular namespace and/or kind.
       DatastoreWordCount.writeDataToDatastore(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 49b77fa..31beef1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,8 @@
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
     <dataflow.version>v1b3-rev26-1.22.0</dataflow.version>
     <dataflow.proto.version>0.5.160222</dataflow.proto.version>
-    <datastore.version>v1beta2-rev1-4.0.0</datastore.version>
+    <datastore.client.version>1.0.0-beta.2</datastore.client.version>
+    <datastore.proto.version>1.0.0-beta</datastore.proto.version>
     <google-auto-service.version>1.0-rc2</google-auto-service.version>
     <google-auto-value.version>1.1</google-auto-value.version>
     <google-clients.version>1.22.0</google-clients.version>
@@ -437,38 +438,15 @@
       </dependency>
 
       <dependency>
-        <groupId>com.google.apis</groupId>
-        <artifactId>google-api-services-datastore-protobuf</artifactId>
-        <version>${datastore.version}</version>
-        <exclusions>
-          <!-- Exclude an old version of guava that is being pulled in by a transitive 
-            dependency of google-api-client -->
-          <exclusion>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava-jdk5</artifactId>
-          </exclusion>
-          <!-- Exclude old version of api client dependencies. -->
-          <exclusion>
-            <groupId>com.google.http-client</groupId>
-            <artifactId>google-http-client</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>com.google.api-client</groupId>
-            <artifactId>google-api-client</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>com.google.oauth-client</groupId>
-            <artifactId>google-oauth-client</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>com.google.http-client</groupId>
-            <artifactId>google-http-client-jackson</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>com.google.http-client</groupId>
-            <artifactId>google-http-client-protobuf</artifactId>
-          </exclusion>
-        </exclusions>
+        <groupId>com.google.cloud.datastore</groupId>
+        <artifactId>datastore-v1beta3-proto-client</artifactId>
+        <version>${datastore.client.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.google.cloud.datastore</groupId>
+        <artifactId>datastore-v1beta3-protos</artifactId>
+        <version>${datastore.proto.version}</version>
       </dependency>
 
       <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 999e16d..5408462 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -313,11 +313,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-datastore-protobuf</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.oauth-client</groupId>
       <artifactId>google-oauth-client</artifactId>
     </dependency>
@@ -454,5 +449,11 @@
       <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-protos</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
index 42a0b99..e7c0791 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
@@ -30,7 +30,8 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 
-import com.google.api.services.datastore.DatastoreV1;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Query;
 
 import org.junit.Test;
 
@@ -44,20 +45,20 @@ public class DataflowDatastoreIOTest {
   public void testSourcePrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
     PTransform<PInput, ?> read = DatastoreIO.readFrom(
-        "myDataset", DatastoreV1.Query.newBuilder().build());
+        "myProject", Query.newBuilder().build());
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("DatastoreIO read should include the dataset in its primitive display data",
-        displayData, hasItem(hasDisplayItem("dataset")));
+    assertThat("DatastoreIO read should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("project")));
   }
 
   @Test
   public void testSinkPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PTransform<PCollection<DatastoreV1.Entity>, ?> write = DatastoreIO.writeTo("myDataset");
+    PTransform<PCollection<Entity>, ?> write = DatastoreIO.writeTo("myProject");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the dataset in its primitive display data",
-        displayData, hasItem(hasDisplayItem("dataset")));
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("project")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c559cff..bbba77b 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -393,8 +393,13 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-datastore-protobuf</artifactId>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-proto-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-protos</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
deleted file mode 100644
index a704772..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Entity} objects based on their encoded Protocol Buffer form.
- */
-public class EntityCoder extends AtomicCoder<Entity> {
-
-  @JsonCreator
-  public static EntityCoder of() {
-    return INSTANCE;
-  }
-
-  /***************************/
-
-  private static final EntityCoder INSTANCE = new EntityCoder();
-
-  private EntityCoder() {}
-
-  @Override
-  public void encode(Entity value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Entity");
-    }
-
-    // Since Entity implements com.google.protobuf.MessageLite,
-    // we could directly use writeTo to write to a OutputStream object
-    outStream.write(java.nio.ByteBuffer.allocate(4).putInt(value.getSerializedSize()).array());
-    value.writeTo(outStream);
-    outStream.flush();
-  }
-
-  @Override
-  public Entity decode(InputStream inStream, Context context)
-      throws IOException {
-    byte[] entitySize = new byte[4];
-    inStream.read(entitySize, 0, 4);
-    int size = java.nio.ByteBuffer.wrap(entitySize).getInt();
-    byte[] data = new byte[size];
-    inStream.read(data, 0, size);
-    return Entity.parseFrom(data);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Entity value, Context context)
-      throws Exception {
-    return value.getSerializedSize();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always.
-   *         A datastore kind can hold arbitrary {@link Object} instances, which
-   *         makes the encoding non-deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Datastore encodings can hold arbitrary Object instances");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
index 137c6cd..7fab79d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
@@ -17,21 +17,22 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.api.services.datastore.DatastoreV1.PropertyFilter.Operator.EQUAL;
-import static com.google.api.services.datastore.DatastoreV1.PropertyOrder.Direction.DESCENDING;
-import static com.google.api.services.datastore.DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeOrder;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.EntityCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.GcpOptions;
@@ -45,26 +46,27 @@ import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
-import com.google.api.services.datastore.DatastoreV1.CommitRequest;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.EntityResult;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Key.PathElement;
-import com.google.api.services.datastore.DatastoreV1.PartitionId;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.QueryResultBatch;
-import com.google.api.services.datastore.DatastoreV1.RunQueryRequest;
-import com.google.api.services.datastore.DatastoreV1.RunQueryResponse;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreException;
-import com.google.api.services.datastore.client.DatastoreFactory;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.DatastoreOptions;
-import com.google.api.services.datastore.client.QuerySplitter;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Ints;
+import com.google.datastore.v1beta3.CommitRequest;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreException;
+import com.google.datastore.v1beta3.client.DatastoreFactory;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.DatastoreOptions;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,10 +96,9 @@ import javax.annotation.Nullable;
  * </pre>
  *
  * <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreIO#source} and
- * its methods {@link DatastoreIO.Source#withDataset} and {@link DatastoreIO.Source#withQuery} to
- * specify the dataset to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link DatastoreIO.Source#withNamespace} or a Datastore host using
- * {@link DatastoreIO.Source#withHost}.
+ * its methods {@link DatastoreIO.Source#withProject} and {@link DatastoreIO.Source#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link DatastoreIO.Source#withNamespace}.
  *
  * <p>For example:
  *
@@ -105,32 +106,31 @@ import javax.annotation.Nullable;
  * // Read a query from Datastore
  * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  * Query query = ...;
- * String dataset = "...";
+ * String projectId = "...";
  *
  * Pipeline p = Pipeline.create(options);
  * PCollection<Entity> entities = p.apply(
  *     Read.from(DatastoreIO.source()
- *         .withDataset(datasetId)
- *         .withQuery(query)
- *         .withHost(host)));
+ *         .withProject(projectId)
+ *         .withQuery(query));
  * } </pre>
  *
  * <p>or:
  *
  * <pre> {@code
- * // Read a query from Datastore using the default namespace and host
+ * // Read a query from Datastore using the default namespace
  * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  * Query query = ...;
- * String dataset = "...";
+ * String projectId = "...";
  *
  * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query));
+ * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(projectId, query));
  * p.run();
  * } </pre>
  *
  * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
  * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then
+ * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
  * all returned results will be read by a single Dataflow worker in order to ensure correct data.
  *
  * <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreIO#writeTo},
@@ -138,19 +138,10 @@ import javax.annotation.Nullable;
  *
  * <pre> {@code
  * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.writeTo(dataset));
+ * entities.apply(DatastoreIO.writeTo(projectId));
  * p.run();
  * } </pre>
  *
- * <p>To optionally change the host that is used to write to the Datastore, use {@link
- * DatastoreIO#sink} to build a {@link DatastoreIO.Sink} and write to it using the {@link Write}
- * transform:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(Write.to(DatastoreIO.sink().withDataset(dataset).withHost(host)));
- * } </pre>
- *
  * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
  * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
  * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than the
@@ -177,8 +168,6 @@ import javax.annotation.Nullable;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class DatastoreIO {
-  public static final String DEFAULT_HOST = "https://www.googleapis.com";
-
   /**
    * Datastore has a limit of 500 mutations per batch operation, so we flush
    * changes to Datastore every 500 entities.
@@ -186,9 +175,9 @@ public class DatastoreIO {
   public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
 
   /**
-   * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
-   * Configure the {@code dataset}, {@code query}, and {@code namespace} using
-   * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
+   * Returns an empty {@link DatastoreIO.Source} builder.
+   * Configure the {@code project}, {@code query}, and {@code namespace} using
+   * {@link DatastoreIO.Source#withProject}, {@link DatastoreIO.Source#withQuery},
    * and {@link DatastoreIO.Source#withNamespace}.
    *
    * @deprecated the name and return type do not match. Use {@link #source()}.
@@ -199,48 +188,32 @@ public class DatastoreIO {
   }
 
   /**
-   * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
-   * Configure the {@code dataset}, {@code query}, and {@code namespace} using
-   * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
+   * Returns an empty {@link DatastoreIO.Source} builder.
+   * Configure the {@code project}, {@code query}, and {@code namespace} using
+   * {@link DatastoreIO.Source#withProject}, {@link DatastoreIO.Source#withQuery},
    * and {@link DatastoreIO.Source#withNamespace}.
    *
    * <p>The resulting {@link Source} object can be passed to {@link Read} to create a
    * {@code PTransform} that will read from Datastore.
    */
   public static Source source() {
-    return new Source(DEFAULT_HOST, null, null, null);
+    return new Source(null, null, null);
   }
 
   /**
    * Returns a {@code PTransform} that reads Datastore entities from the query
-   * against the given dataset.
+   * against the given project.
    */
-  public static Read.Bounded<Entity> readFrom(String datasetId, Query query) {
-    return Read.from(new Source(DEFAULT_HOST, datasetId, query, null));
-  }
-
-  /**
-   * Returns a {@code PTransform} that reads Datastore entities from the query
-   * against the given dataset and host.
-   *
-   * @deprecated prefer {@link #source()} with {@link Source#withHost}, {@link Source#withDataset},
-   *    {@link Source#withQuery}s.
-   */
-  @Deprecated
-  public static Read.Bounded<Entity> readFrom(String host, String datasetId, Query query) {
-    return Read.from(new Source(host, datasetId, query, null));
+  public static Read.Bounded<Entity> readFrom(String projectId, Query query) {
+    return Read.from(new Source(projectId, query, null));
   }
 
   /**
    * A {@link Source} that reads the result rows of a Datastore query as {@code Entity} objects.
    */
   public static class Source extends BoundedSource<Entity> {
-    public String getHost() {
-      return host;
-    }
-
-    public String getDataset() {
-      return datasetId;
+    public String getProjectId() {
+      return projectId;
     }
 
     public Query getQuery() {
@@ -252,9 +225,9 @@ public class DatastoreIO {
       return namespace;
     }
 
-    public Source withDataset(String datasetId) {
-      checkNotNull(datasetId, "datasetId");
-      return new Source(host, datasetId, query, namespace);
+    public Source withProject(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new Source(projectId, query, namespace);
     }
 
     /**
@@ -264,28 +237,23 @@ public class DatastoreIO {
      *
      * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel
      * across many workers. However, when the {@link Query} is configured with a limit using
-     * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then all
+     * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then all
      * returned results will be read by a single Dataflow worker in order to ensure correct data.
      */
     public Source withQuery(Query query) {
       checkNotNull(query, "query");
-      checkArgument(!query.hasLimit() || query.getLimit() > 0,
-          "Invalid query limit %s: must be positive", query.getLimit());
-      return new Source(host, datasetId, query, namespace);
-    }
-
-    public Source withHost(String host) {
-      checkNotNull(host, "host");
-      return new Source(host, datasetId, query, namespace);
+      checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+          "Invalid query limit %s: must be positive", query.getLimit().getValue());
+      return new Source(projectId, query, namespace);
     }
 
     public Source withNamespace(@Nullable String namespace) {
-      return new Source(host, datasetId, query, namespace);
+      return new Source(projectId, query, namespace);
     }
 
     @Override
     public Coder<Entity> getDefaultOutputCoder() {
-      return EntityCoder.of();
+      return ProtoCoder.of(Entity.class);
     }
 
     @Override
@@ -327,7 +295,7 @@ public class DatastoreIO {
 
       ImmutableList.Builder<Source> splits = ImmutableList.builder();
       for (Query splitQuery : datastoreSplits) {
-        splits.add(new Source(host, datasetId, splitQuery, namespace));
+        splits.add(new Source(projectId, splitQuery, namespace));
       }
       return splits.build();
     }
@@ -339,9 +307,8 @@ public class DatastoreIO {
 
     @Override
     public void validate() {
-      Preconditions.checkNotNull(host, "host");
       Preconditions.checkNotNull(query, "query");
-      Preconditions.checkNotNull(datasetId, "datasetId");
+      Preconditions.checkNotNull(projectId, "projectId");
     }
 
     @Override
@@ -369,7 +336,7 @@ public class DatastoreIO {
       } else {
         query.addKindBuilder().setName("__Ns_Stat_Kind__");
       }
-      query.setFilter(makeFilter(
+      query.setFilter(makeAndFilter(
           makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
           makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
       RunQueryRequest request = makeRequest(query.build());
@@ -379,22 +346,20 @@ public class DatastoreIO {
       LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
 
       QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultCount() == 0) {
+      if (batch.getEntityResultsCount() == 0) {
         throw new NoSuchElementException(
             "Datastore statistics for kind " + ourKind + " unavailable");
       }
-      Entity entity = batch.getEntityResult(0).getEntity();
-      return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("entity_bytes").getIntegerValue();
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .addIfNotDefault(DisplayData.item("host", host)
-            .withLabel("Datastore Service"), DEFAULT_HOST)
-          .addIfNotNull(DisplayData.item("dataset", datasetId)
-            .withLabel("Input Dataset"))
+          .addIfNotNull(DisplayData.item("project", projectId)
+            .withLabel("Input Project"))
           .addIfNotNull(DisplayData.item("namespace", namespace)
             .withLabel("App Engine Namespace"));
 
@@ -407,8 +372,7 @@ public class DatastoreIO {
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("host", host)
-          .add("dataset", datasetId)
+          .add("project", projectId)
           .add("query", query)
           .add("namespace", namespace)
           .toString();
@@ -417,10 +381,9 @@ public class DatastoreIO {
     ///////////////////////////////////////////////////////////////////////////////////////////
 
     private static final Logger LOG = LoggerFactory.getLogger(Source.class);
-    private final String host;
     /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
     @Nullable
-    private final String datasetId;
+    private final String projectId;
     /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
     @Nullable
     private final Query query;
@@ -439,10 +402,9 @@ public class DatastoreIO {
      * an error will be thrown.
      */
     private Source(
-        String host, @Nullable String datasetId, @Nullable Query query,
+        @Nullable String projectId, @Nullable Query query,
         @Nullable String namespace) {
-      this.host = checkNotNull(host, "host");
-      this.datasetId = datasetId;
+      this.projectId = projectId;
       this.query = query;
       this.namespace = namespace;
     }
@@ -456,7 +418,7 @@ public class DatastoreIO {
       // If namespace is set, include it in the split request so splits are calculated accordingly.
       PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
       if (namespace != null) {
-        partitionBuilder.setNamespace(namespace);
+        partitionBuilder.setNamespaceId(namespace);
       }
 
       if (mockSplitter != null) {
@@ -475,7 +437,7 @@ public class DatastoreIO {
     private RunQueryRequest makeRequest(Query query) {
       RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
       if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespace(namespace);
+        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
       }
       return requestBuilder.build();
     }
@@ -488,26 +450,29 @@ public class DatastoreIO {
       Query.Builder query = Query.newBuilder();
       query.addKindBuilder().setName("__Stat_Total__");
       query.addOrder(makeOrder("timestamp", DESCENDING));
-      query.setLimit(1);
+      query.setLimit(Int32Value.newBuilder().setValue(1));
       RunQueryRequest request = makeRequest(query.build());
 
       long now = System.currentTimeMillis();
       RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for latest stats timestamp of dataset {} took {}ms", datasetId,
+      LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
           System.currentTimeMillis() - now);
       QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultCount() == 0) {
+      if (batch.getEntityResultsCount() == 0) {
         throw new NoSuchElementException(
-            "Datastore total statistics for dataset " + datasetId + " unavailable");
+            "Datastore total statistics for project " + projectId + " unavailable");
       }
-      Entity entity = batch.getEntityResult(0).getEntity();
-      return getPropertyMap(entity).get("timestamp").getTimestampMicrosecondsValue();
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
     }
 
     private Datastore getDatastore(PipelineOptions pipelineOptions) {
       DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder().host(host).dataset(datasetId).initializer(
-              new RetryHttpRequestInitializer());
+          new DatastoreOptions.Builder()
+              .projectId(projectId)
+              .initializer(
+                new RetryHttpRequestInitializer()
+              );
 
       Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
       if (credential != null) {
@@ -518,7 +483,7 @@ public class DatastoreIO {
 
     /** For testing only. */
     Source withMockSplitter(QuerySplitter splitter) {
-      Source res = new Source(host, datasetId, query, namespace);
+      Source res = new Source(projectId, query, namespace);
       res.mockSplitter = splitter;
       res.mockEstimateSizeBytes = mockEstimateSizeBytes;
       return res;
@@ -526,7 +491,7 @@ public class DatastoreIO {
 
     /** For testing only. */
     Source withMockEstimateSizeBytes(Long estimateSizeBytes) {
-      Source res = new Source(host, datasetId, query, namespace);
+      Source res = new Source(projectId, query, namespace);
       res.mockSplitter = mockSplitter;
       res.mockEstimateSizeBytes = estimateSizeBytes;
       return res;
@@ -536,23 +501,23 @@ public class DatastoreIO {
   ///////////////////// Write Class /////////////////////////////////
 
   /**
-   * Returns a new {@link DatastoreIO.Sink} builder using the default host.
-   * You need to further configure it using {@link DatastoreIO.Sink#withDataset}, and optionally
-   * {@link DatastoreIO.Sink#withHost} before using it in a {@link Write} transform.
+   * Returns a new {@link DatastoreIO.Sink} builder.
+   * You need to further configure it using {@link DatastoreIO.Sink#withProject}, before using it
+   * in a {@link Write} transform.
    *
-   * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withDataset(dataset)));}
+   * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withProject(projectId)));}
    */
   public static Sink sink() {
-    return new Sink(DEFAULT_HOST, null);
+    return new Sink(null);
   }
 
   /**
    * Returns a new {@link Write} transform that will write to a {@link Sink}.
    *
-   * <p>For example: {@code p.apply(DatastoreIO.writeTo(dataset));}
+   * <p>For example: {@code p.apply(DatastoreIO.writeTo(projectId));}
    */
-  public static Write.Bound<Entity> writeTo(String datasetId) {
-    return Write.to(sink().withDataset(datasetId));
+  public static Write.Bound<Entity> writeTo(String projectId) {
+    return Write.to(sink().withProject(projectId));
   }
 
   /**
@@ -561,44 +526,31 @@ public class DatastoreIO {
    *
    */
   public static class Sink extends org.apache.beam.sdk.io.Sink<Entity> {
-    final String host;
-    final String datasetId;
+    final String projectId;
 
     /**
-     * Returns a {@link Sink} that is like this one, but will write to the specified dataset.
+     * Returns a {@link Sink} that is like this one, but will write to the specified project.
      */
-    public Sink withDataset(String datasetId) {
-      checkNotNull(datasetId, "datasetId");
-      return new Sink(host, datasetId);
+    public Sink withProject(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new Sink(projectId);
     }
 
     /**
-     * Returns a {@link Sink} that is like this one, but will use the given host.  If not specified,
-     * the {@link DatastoreIO#DEFAULT_HOST default host} will be used.
+     * Constructs a Sink with the given project.
      */
-    public Sink withHost(String host) {
-      checkNotNull(host, "host");
-      return new Sink(host, datasetId);
+    protected Sink(String projectId) {
+      this.projectId = projectId;
     }
 
     /**
-     * Constructs a Sink with given host and dataset.
-     */
-    protected Sink(String host, String datasetId) {
-      this.host = checkNotNull(host, "host");
-      this.datasetId = datasetId;
-    }
-
-    /**
-     * Ensures the host and dataset are set.
+     * Ensures the project is set.
      */
     @Override
     public void validate(PipelineOptions options) {
       Preconditions.checkNotNull(
-          host, "Host is a required parameter. Please use withHost to set the host.");
-      Preconditions.checkNotNull(
-          datasetId,
-          "Dataset ID is a required parameter. Please use withDataset to to set the datasetId.");
+          projectId,
+          "Project ID is a required parameter. Please use withProject to to set the projectId.");
     }
 
     @Override
@@ -610,10 +562,8 @@ public class DatastoreIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .addIfNotDefault(DisplayData.item("host", host)
-            .withLabel("Datastore Service"), DEFAULT_HOST)
-          .addIfNotNull(DisplayData.item("dataset", datasetId)
-            .withLabel("Output Dataset"));
+          .addIfNotNull(DisplayData.item("project", projectId)
+            .withLabel("Output Project"));
     }
   }
 
@@ -655,8 +605,7 @@ public class DatastoreIO {
     public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
       DatastoreOptions.Builder builder =
           new DatastoreOptions.Builder()
-              .host(sink.host)
-              .dataset(sink.datasetId)
+              .projectId(sink.projectId)
               .initializer(new RetryHttpRequestInitializer());
       Credential credential = options.as(GcpOptions.class).getGcpCredential();
       if (credential != null) {
@@ -716,12 +665,12 @@ public class DatastoreIO {
      * has either an id or a name.
      */
     static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathElementList();
+      List<PathElement> elementList = key.getPathList();
       if (elementList.isEmpty()) {
         return false;
       }
       PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.hasId() || lastElement.hasName());
+      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
     }
 
     // Visible for testing
@@ -789,13 +738,13 @@ public class DatastoreIO {
         // Batch upsert entities.
         try {
           CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          commitRequest.getMutationBuilder().addAllUpsert(entities);
+          for (Entity entity: entities) {
+            commitRequest.addMutations(makeUpsert(entity));
+          }
           commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
           datastore.commit(commitRequest.build());
-
           // Break if the commit threw no exception.
           break;
-
         } catch (DatastoreException exception) {
           // Only log the code and message for potentially-transient errors. The entire exception
           // will be propagated upon the last retry.
@@ -878,7 +827,8 @@ public class DatastoreIO {
       this.source = source;
       this.datastore = datastore;
       // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
-      userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE;
+      userLimit = source.query.hasLimit()
+          ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
     }
 
     @Override
@@ -950,8 +900,8 @@ public class DatastoreIO {
      */
     private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
       Query.Builder query = source.query.toBuilder().clone();
-      query.setLimit(Math.min(userLimit, QUERY_BATCH_LIMIT));
-      if (currentBatch != null && currentBatch.hasEndCursor()) {
+      query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
+      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
         query.setStartCursor(currentBatch.getEndCursor());
       }
 
@@ -963,7 +913,7 @@ public class DatastoreIO {
       // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
       // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
       // use result count to determine if more results might exist.
-      int numFetch = currentBatch.getEntityResultCount();
+      int numFetch = currentBatch.getEntityResultsCount();
       if (source.query.hasLimit()) {
         verify(userLimit >= numFetch,
             "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
@@ -982,7 +932,7 @@ public class DatastoreIO {
         return null;
       }
 
-      return currentBatch.getEntityResultList().iterator();
+      return currentBatch.getEntityResultsList().iterator();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
deleted file mode 100644
index 2a22925..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeProperty;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link EntityCoder}.
- */
-@RunWith(JUnit4.class)
-public class EntityCoderTest {
-
-  private static final Coder<Entity> TEST_CODER = EntityCoder.of();
-
-  // Presumably if anything works, everything works,
-  // as actual serialization is fully delegated to
-  // autogenerated code from a well-tested library.
-  private static final List<Entity> TEST_VALUES = Arrays.<Entity>asList(
-      Entity.newBuilder()
-          .setKey(makeKey("TestKind", "emptyEntity"))
-          .build(),
-      Entity.newBuilder()
-          .setKey(makeKey("TestKind", "testSimpleProperties"))
-          .addProperty(makeProperty("trueProperty", makeValue(true)))
-          .addProperty(makeProperty("falseProperty", makeValue(false)))
-          .addProperty(makeProperty("stringProperty", makeValue("hello")))
-          .addProperty(makeProperty("integerProperty", makeValue(3)))
-          .addProperty(makeProperty("doubleProperty", makeValue(-1.583257)))
-          .build(),
-      Entity.newBuilder()
-          .setKey(makeKey("TestKind", "testNestedEntity"))
-          .addProperty(makeProperty("entityProperty",
-              makeValue(Entity.newBuilder()
-                  .addProperty(makeProperty("stringProperty", makeValue("goodbye"))))))
-          .build());
-
-  @Test
-  public void testDecodeEncodeEqual() throws Exception {
-    for (Entity value : TEST_VALUES) {
-      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
-    }
-  }
-
-  // If this changes, it implies the binary format has changed.
-  private static final String EXPECTED_ENCODING_ID = "";
-
-  @Test
-  public void testEncodingId() throws Exception {
-    CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
-  }
-
-  /**
-   * Generated data to check that the wire format has not changed. To regenerate, see
-   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
-   */
-  private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "AAAAGwoZEhcKCFRlc3RLaW5kGgtlbXB0eUVudGl0eQ",
-      "AAAAnQoiEiAKCFRlc3RLaW5kGhR0ZXN0U2ltcGxlUHJvcGVydGllcxISCgx0cnVlUHJvcGVydHkiAggBEhMKDWZhbHNl"
-          + "UHJvcGVydHkiAggAEhoKDnN0cmluZ1Byb3BlcnR5IgiKAQVoZWxsbxIVCg9pbnRlZ2VyUHJvcGVydHkiAhADEh"
-          + "sKDmRvdWJsZVByb3BlcnR5IgkZ8ZvCSgVV-b8",
-      "AAAAVAoeEhwKCFRlc3RLaW5kGhB0ZXN0TmVzdGVkRW50aXR5EjIKDmVudGl0eVByb3BlcnR5IiAyHhIcCg5zdHJpbmdQ"
-          + "cm9wZXJ0eSIKigEHZ29vZGJ5ZQ");
-
-  @Test
-  public void testWireFormatEncode() throws Exception {
-      CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
-  }
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void encodeNullThrowsCoderException() throws Exception {
-    thrown.expect(CoderException.class);
-    thrown.expectMessage("cannot encode a null Entity");
-
-    CoderUtils.encodeToBase64(TEST_CODER, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
index b41c340..14fe4d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 
-import com.google.api.services.datastore.DatastoreV1.Entity;
 import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
 import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
 import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
@@ -152,9 +151,9 @@ public class ProtobufUtilTest {
   }
 
   @Test
-  public void testEntityIsDeterministic() throws NonDeterministicException {
-    // Cloud Datastore's Entities can be encoded deterministically.
-    verifyDeterministic(ProtoCoder.of(Entity.class));
+  public void testDurationIsDeterministic() throws NonDeterministicException {
+    // Duration can be encoded deterministically.
+    verifyDeterministic(ProtoCoder.of(Duration.class));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
index 622abb2..2aca190 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -47,21 +47,22 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.TestCredential;
 
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.EntityResult;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.KindExpression;
-import com.google.api.services.datastore.DatastoreV1.PartitionId;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.QueryResultBatch;
-import com.google.api.services.datastore.DatastoreV1.RunQueryRequest;
-import com.google.api.services.datastore.DatastoreV1.RunQueryResponse;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.QuerySplitter;
 import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.KindExpression;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -84,8 +85,7 @@ import java.util.NoSuchElementException;
  */
 @RunWith(JUnit4.class)
 public class DatastoreIOTest {
-  private static final String HOST = "testHost";
-  private static final String DATASET = "testDataset";
+  private static final String PROJECT = "testProject";
   private static final String NAMESPACE = "testNamespace";
   private static final String KIND = "testKind";
   private static final Query QUERY;
@@ -108,7 +108,7 @@ public class DatastoreIOTest {
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     initialSource = DatastoreIO.source()
-        .withHost(HOST).withDataset(DATASET).withQuery(QUERY).withNamespace(NAMESPACE);
+        .withProject(PROJECT).withQuery(QUERY).withNamespace(NAMESPACE);
   }
 
   /**
@@ -123,10 +123,9 @@ public class DatastoreIOTest {
   @Test
   public void testBuildSource() throws Exception {
     DatastoreIO.Source source = DatastoreIO.source()
-        .withHost(HOST).withDataset(DATASET).withQuery(QUERY).withNamespace(NAMESPACE);
+        .withProject(PROJECT).withQuery(QUERY).withNamespace(NAMESPACE);
     assertEquals(QUERY, source.getQuery());
-    assertEquals(DATASET, source.getDataset());
-    assertEquals(HOST, source.getHost());
+    assertEquals(PROJECT, source.getProjectId());
     assertEquals(NAMESPACE, source.getNamespace());
   }
 
@@ -136,33 +135,23 @@ public class DatastoreIOTest {
   @Test
   public void testBuildSourceAlt() throws Exception {
     DatastoreIO.Source source = DatastoreIO.source()
-        .withDataset(DATASET).withNamespace(NAMESPACE).withQuery(QUERY).withHost(HOST);
+        .withProject(PROJECT).withNamespace(NAMESPACE).withQuery(QUERY);
     assertEquals(QUERY, source.getQuery());
-    assertEquals(DATASET, source.getDataset());
-    assertEquals(HOST, source.getHost());
+    assertEquals(PROJECT, source.getProjectId());
     assertEquals(NAMESPACE, source.getNamespace());
   }
 
   @Test
-  public void testSourceValidationFailsHost() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("host");
-
-    DatastoreIO.Source source = initialSource.withHost(null);
-    source.validate();
-  }
-
-  @Test
-  public void testSourceValidationFailsDataset() throws Exception {
+  public void testSourceValidationFailsProject() throws Exception {
     DatastoreIO.Source source = DatastoreIO.source().withQuery(QUERY);
     thrown.expect(NullPointerException.class);
-    thrown.expectMessage("dataset");
+    thrown.expectMessage("project");
     source.validate();
   }
 
   @Test
   public void testSourceValidationFailsQuery() throws Exception {
-    DatastoreIO.Source source = DatastoreIO.source().withDataset(DATASET);
+    DatastoreIO.Source source = DatastoreIO.source().withProject(PROJECT);
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("query");
     source.validate();
@@ -170,25 +159,25 @@ public class DatastoreIOTest {
 
   @Test
   public void testSourceValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(0).build();
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0");
+    thrown.expectMessage("Invalid query limit 0: must be positive");
 
     DatastoreIO.source().withQuery(invalidLimit);
   }
 
   @Test
   public void testSourceValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(-5).build();
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5");
+    thrown.expectMessage("Invalid query limit -5: must be positive");
 
     DatastoreIO.source().withQuery(invalidLimit);
   }
 
   @Test
   public void testSourceValidationSucceedsNamespace() throws Exception {
-    DatastoreIO.Source source = DatastoreIO.source().withDataset(DATASET).withQuery(QUERY);
+    DatastoreIO.Source source = DatastoreIO.source().withProject(PROJECT).withQuery(QUERY);
     /* Should succeed, as a null namespace is fine. */
     source.validate();
   }
@@ -196,61 +185,49 @@ public class DatastoreIOTest {
   @Test
   public void testSourceDisplayData() {
   DatastoreIO.Source source = DatastoreIO.source()
-      .withDataset(DATASET)
+      .withProject(PROJECT)
       .withQuery(QUERY)
-      .withHost(HOST)
       .withNamespace(NAMESPACE);
 
     DisplayData displayData = DisplayData.from(source);
 
-    assertThat(displayData, hasDisplayItem("dataset", DATASET));
+    assertThat(displayData, hasDisplayItem("project", PROJECT));
     assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
-    assertThat(displayData, hasDisplayItem("host", HOST));
     assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
   }
 
   @Test
-  public void testSinkDoesNotAllowNullHost() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("host");
-
-    DatastoreIO.sink().withDataset(DATASET).withHost(null);
-  }
-
-  @Test
-  public void testSinkDoesNotAllowNullDataset() throws Exception {
+  public void testSinkDoesNotAllowNullProject() throws Exception {
     thrown.expect(NullPointerException.class);
-    thrown.expectMessage("datasetId");
+    thrown.expectMessage("projectId");
 
-    DatastoreIO.sink().withDataset(null);
+    DatastoreIO.sink().withProject(null);
   }
 
   @Test
-  public void testSinkValidationFailsWithNoDataset() throws Exception {
+  public void testSinkValidationFailsWithNoProject() throws Exception {
     DatastoreIO.Sink sink = DatastoreIO.sink();
 
     thrown.expect(NullPointerException.class);
-    thrown.expectMessage("Dataset");
+    thrown.expectMessage("Project");
 
     sink.validate(testPipelineOptions());
   }
 
   @Test
-  public void testSinkValidationSucceedsWithDataset() throws Exception {
-    DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET);
+  public void testSinkValidationSucceedsWithProject() throws Exception {
+    DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
     sink.validate(testPipelineOptions());
   }
 
   @Test
   public void testSinkDisplayData() {
     DatastoreIO.Sink sink = DatastoreIO.sink()
-        .withDataset(DATASET)
-        .withHost(HOST);
+        .withProject(PROJECT);
 
     DisplayData displayData = DisplayData.from(sink);
 
-    assertThat(displayData, hasDisplayItem("dataset", DATASET));
-    assertThat(displayData, hasDisplayItem("host", HOST));
+    assertThat(displayData, hasDisplayItem("project", PROJECT));
   }
 
   @Test
@@ -303,7 +280,7 @@ public class DatastoreIOTest {
 
     io.splitIntoBundles(1024, testPipelineOptions());
 
-    PartitionId partition = PartitionId.newBuilder().setNamespace(NAMESPACE).build();
+    PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
     verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
     verifyNoMoreInteractions(splitter);
   }
@@ -344,7 +321,7 @@ public class DatastoreIOTest {
   @Test
   public void testQueryDoesNotSplitWithLimitSet() throws Exception {
     // Minimal query with a limit
-    Query query = Query.newBuilder().setLimit(5).build();
+    Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
 
     // Mock query splitter, should not be invoked.
     QuerySplitter splitter = mock(QuerySplitter.class);
@@ -428,17 +405,14 @@ public class DatastoreIOTest {
    */
   @Test
   public void testBuildSink() throws Exception {
-    DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET).withHost(HOST);
-    assertEquals(HOST, sink.host);
-    assertEquals(DATASET, sink.datasetId);
+    DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
+    assertEquals(PROJECT, sink.projectId);
 
-    sink = DatastoreIO.sink().withHost(HOST).withDataset(DATASET);
-    assertEquals(HOST, sink.host);
-    assertEquals(DATASET, sink.datasetId);
+    sink = DatastoreIO.sink().withProject(PROJECT);
+    assertEquals(PROJECT, sink.projectId);
 
-    sink = DatastoreIO.sink().withDataset(DATASET).withHost(HOST);
-    assertEquals(HOST, sink.host);
-    assertEquals(DATASET, sink.datasetId);
+    sink = DatastoreIO.sink().withProject(PROJECT);
+    assertEquals(PROJECT, sink.projectId);
   }
 
   /**
@@ -446,13 +420,11 @@ public class DatastoreIOTest {
    */
   @Test
   public void testBuildSinkDefaults() throws Exception {
-    DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET);
-    assertEquals(DatastoreIO.DEFAULT_HOST, sink.host);
-    assertEquals(DATASET, sink.datasetId);
+    DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
+    assertEquals(PROJECT, sink.projectId);
 
-    sink = DatastoreIO.sink().withDataset(DATASET);
-    assertEquals(DatastoreIO.DEFAULT_HOST, sink.host);
-    assertEquals(DATASET, sink.datasetId);
+    sink = DatastoreIO.sink().withProject(PROJECT);
+    assertEquals(PROJECT, sink.projectId);
   }
 
   /**
@@ -462,33 +434,33 @@ public class DatastoreIOTest {
   public void testHasNameOrId() {
     Key key;
     // Complete with name, no ancestor
-    key = DatastoreHelper.makeKey("bird", "finch").build();
+    key = makeKey("bird", "finch").build();
     assertTrue(DatastoreWriter.isValidKey(key));
 
     // Complete with id, no ancestor
-    key = DatastoreHelper.makeKey("bird", 123).build();
+    key = makeKey("bird", 123).build();
     assertTrue(DatastoreWriter.isValidKey(key));
 
     // Incomplete, no ancestor
-    key = DatastoreHelper.makeKey("bird").build();
+    key = makeKey("bird").build();
     assertFalse(DatastoreWriter.isValidKey(key));
 
     // Complete with name and ancestor
-    key = DatastoreHelper.makeKey("bird", "owl").build();
-    key = DatastoreHelper.makeKey(key, "bird", "horned").build();
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", "horned").build();
     assertTrue(DatastoreWriter.isValidKey(key));
 
     // Complete with id and ancestor
-    key = DatastoreHelper.makeKey("bird", "owl").build();
-    key = DatastoreHelper.makeKey(key, "bird", 123).build();
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", 123).build();
     assertTrue(DatastoreWriter.isValidKey(key));
 
     // Incomplete with ancestor
-    key = DatastoreHelper.makeKey("bird", "owl").build();
-    key = DatastoreHelper.makeKey(key, "bird").build();
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird").build();
     assertFalse(DatastoreWriter.isValidKey(key));
 
-    key = DatastoreHelper.makeKey().build();
+    key = makeKey().build();
     assertFalse(DatastoreWriter.isValidKey(key));
   }
 
@@ -497,7 +469,7 @@ public class DatastoreIOTest {
    */
   @Test
   public void testAddEntitiesWithIncompleteKeys() throws Exception {
-    Key key = DatastoreHelper.makeKey("bird").build();
+    Key key = makeKey("bird").build();
     Entity entity = Entity.newBuilder().setKey(key).build();
     DatastoreWriter writer = new DatastoreIO.DatastoreWriter(null, mockDatastore);
 
@@ -513,9 +485,9 @@ public class DatastoreIOTest {
   @Test
   public void testAddingEntities() throws Exception {
     List<Entity> expected = Lists.newArrayList(
-        Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "jay").build()).build(),
-        Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "condor").build()).build(),
-        Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "robin").build()).build());
+        Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
+        Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
+        Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
 
     List<Entity> allEntities = Lists.newArrayList(expected);
     Collections.shuffle(allEntities);
@@ -543,7 +515,7 @@ public class DatastoreIOTest {
     assertTrue(q.hasLimit());
 
     // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
-    int limit = q.getLimit();
+    int limit = q.getLimit().getValue();
     assertThat(limit, greaterThanOrEqualTo(1));
     assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
 
@@ -559,7 +531,7 @@ public class DatastoreIOTest {
     // Fill out the other parameters on the returned result batch.
     RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
     ret.getBatchBuilder()
-        .addAllEntityResult(entities)
+        .addAllEntityResults(entities)
         .setEntityResultType(EntityResult.ResultType.FULL)
         .setMoreResults(
             limit == DATASTORE_QUERY_BATCH_LIMIT
@@ -572,8 +544,9 @@ public class DatastoreIOTest {
   /** Helper function to run a test reading from a limited-result query. */
   private void runQueryLimitReadTest(int numEntities) throws Exception {
     // An empty query to read entities.
-    Query query = Query.newBuilder().setLimit(numEntities).build();
-    DatastoreIO.Source source = DatastoreIO.source().withQuery(query).withDataset("mockDataset");
+    Query query = Query.newBuilder().setLimit(
+        Int32Value.newBuilder().setValue(numEntities)).build();
+    DatastoreIO.Source source = DatastoreIO.source().withQuery(query).withProject("mockProject");
 
     // Use mockResponseForQuery to generate results.
     when(mockDatastore.runQuery(any(RunQueryRequest.class)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index 6e87e91..b3f8743 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -79,14 +79,16 @@ public class ApiSurfaceTest {
           inPackage("com.google.api.client"),
           inPackage("com.google.api.services.bigquery"),
           inPackage("com.google.api.services.dataflow"),
-          inPackage("com.google.api.services.datastore"),
           inPackage("com.google.api.services.pubsub"),
           inPackage("com.google.api.services.storage"),
           inPackage("com.google.auth"),
           inPackage("com.google.bigtable.v1"),
           inPackage("com.google.cloud.bigtable.config"),
           inPackage("com.google.cloud.bigtable.grpc"),
+          inPackage("com.google.datastore"),
           inPackage("com.google.protobuf"),
+          inPackage("com.google.rpc"),
+          inPackage("com.google.type"),
           inPackage("com.fasterxml.jackson.annotation"),
           inPackage("io.grpc"),
           inPackage("org.apache.avro"),


Mime
View raw message