beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] beam git commit: Support for querying metrics in Dataflow Runner
Date Mon, 20 Mar 2017 22:09:31 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1d9772a3a -> 59aa0dab7


Support for querying metrics in Dataflow Runner

Added MetricsFiltering class for helper methods related to matching step
names.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6de412a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6de412a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6de412a5

Branch: refs/heads/master
Commit: 6de412a5dfb3000ab5d354ada8761789230d3ce3
Parents: 1d9772a
Author: Pablo <pabloem@google.com>
Authored: Fri Mar 10 16:10:31 2017 -0800
Committer: bchambers <bchambers@google.com>
Committed: Mon Mar 20 14:48:22 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      |  70 +-----
 .../beam/runners/direct/DirectMetricsTest.java  |  86 ++-----
 .../beam/runners/dataflow/DataflowMetrics.java  | 212 +++++++++++++++++
 .../runners/dataflow/DataflowPipelineJob.java   |  14 +-
 .../runners/dataflow/DataflowMetricsTest.java   | 236 +++++++++++++++++++
 .../beam/sdk/metrics/MetricFiltering.java       |  99 ++++++++
 .../beam/sdk/metrics/MetricFilteringTest.java   |  72 ++++++
 7 files changed, 655 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index fa8f9c3..f04dc21 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -20,12 +20,10 @@ package org.apache.beam.runners.direct;
 import static java.util.Arrays.asList;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -35,9 +33,9 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -258,7 +256,7 @@ class DirectMetrics extends MetricResults {
       MetricsFilter filter,
       ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
       Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
-    if (matches(filter, entry.getKey())) {
+    if (MetricFiltering.matches(filter, entry.getKey())) {
       resultsBuilder.add(DirectMetricResult.create(
           entry.getKey().metricName(),
           entry.getKey().stepName(),
@@ -267,70 +265,6 @@ class DirectMetrics extends MetricResults {
     }
   }
 
-  // Matching logic is implemented here rather than in MetricsFilter because we would like
-  // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it
with
-  // a Proto/JSON/etc. schema object.
-  private boolean matches(MetricsFilter filter, MetricKey key) {
-    return matchesName(key.metricName(), filter.names())
-        && matchesScope(key.stepName(), filter.steps());
-  }
-
-  /**
-  * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
-  * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
-  * but not "a/fool/bar/b" or "a/foo/bart/b".
-  */
-  public boolean subPathMatches(String haystack, String needle) {
-    int location = haystack.indexOf(needle);
-    int end = location + needle.length();
-    if (location == -1) {
-      return false;  // needle not found
-    } else if (location != 0 && haystack.charAt(location - 1) != '/') {
-      return false; // the first entry in needle wasn't exactly matched
-    } else if (end != haystack.length() && haystack.charAt(end) != '/') {
-      return false; // the last entry in needle wasn't exactly matched
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
-   * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
-   * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
-   * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
-  public boolean matchesScope(String actualScope, Set<String> scopes) {
-    if (scopes.isEmpty() || scopes.contains(actualScope)) {
-      return true;
-    }
-
-    // If there is no perfect match, a stage name-level match is tried.
-    // This is done by a substring search over the levels of the scope.
-    // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
-    for (String scope : scopes) {
-      if (subPathMatches(actualScope, scope)) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters)
{
-    if (nameFilters.isEmpty()) {
-      return true;
-    }
-
-    for (MetricNameFilter nameFilter : nameFilters) {
-      if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
-          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
   /** Apply metric updates that represent physical counter deltas to the current metric values.
*/
   public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
     for (MetricUpdate<Long> counter : updates.counterUpdates()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 77229bf..7183124 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -23,13 +23,9 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
 import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -129,97 +125,63 @@ public class DirectMetricsTest {
             committedMetricsResult("ns1", "name1", "step2", 0L)));
   }
 
-  private boolean matchesSubPath(String actualScope, String subPath) {
-    return metrics.subPathMatches(actualScope, subPath);
-  }
-
-  @Test
-  public void testMatchesSubPath() {
-    assertTrue("Match of the first element",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
-    assertTrue("Match of the first elements",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
-    assertTrue("Match of the last elements",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
-    assertFalse("Substring match but no subpath match",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
-    assertFalse("Substring match from start - but no subpath match",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
-  }
-
-  private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
-    Set<String> scopeFilter = new HashSet<String>();
-    scopeFilter.add(filter);
-    return metrics.matchesScope(actualScope, scopeFilter);
-  }
-
-  @Test
-  public void testMatchesScope() {
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
-    assertTrue(matchesScopeWithSingleFilter(
-        "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
-    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
-    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
-  }
-
   @SuppressWarnings("unchecked")
   @Test
-  public void testPartialScopeMatchingInMetricsQuery() {
+  public void testApplyAttemptedQueryCompositeScope() {
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
-            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
-            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
-            MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
-            MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
+            MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
 
     MetricQueryResults results = metrics.queryMetrics(
-        MetricsFilter.builder().addStep("Top1/Outer1").build());
+        MetricsFilter.builder().addStep("Outer1").build());
 
     assertThat(results.counters(),
         containsInAnyOrder(
-            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
-            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
-
-    results = metrics.queryMetrics(
-        MetricsFilter.builder().addStep("Inner2").build());
+            attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
+            attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
 
     assertThat(results.counters(),
         containsInAnyOrder(
-            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
-            attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
+            committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
+            committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
   }
 
+
   @SuppressWarnings("unchecked")
   @Test
-  public void testApplyAttemptedQueryCompositeScope() {
+  public void testPartialScopeMatchingInMetricsQuery() {
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
-            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
-            MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
+            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
-            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
-            MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
+            MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
+            MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
 
     MetricQueryResults results = metrics.queryMetrics(
-        MetricsFilter.builder().addStep("Outer1").build());
+        MetricsFilter.builder().addStep("Top1/Outer1").build());
 
     assertThat(results.counters(),
         containsInAnyOrder(
-            attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
-            attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
+
+    results = metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Inner2").build());
 
     assertThat(results.counters(),
         containsInAnyOrder(
-            committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
-            committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
new file mode 100644
index 0000000..c0d1883
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link MetricResults} for the Dataflow Runner.
+ */
+class DataflowMetrics extends MetricResults {
+  private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
+  /**
+   * Client for the Dataflow service. This can be used to query the service
+   * for information about the job.
+   */
+  private DataflowClient dataflowClient;
+
+  /**
+   * PipelineResult implementation for Dataflow Runner. It contains job state and id information.
+   */
+  private DataflowPipelineJob dataflowPipelineJob;
+
+  /**
+   * After the job has finished running, Metrics no longer will change, so their results
are
+   * cached here.
+   */
+  private MetricQueryResults cachedMetricResults = null;
+
+  /**
+   * Constructor for the DataflowMetrics class.
+   * @param dataflowPipelineJob is used to get Job state and Job ID information.
+   * @param dataflowClient is used to query user metrics from the Dataflow service.
+   */
+  public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient dataflowClient)
{
+    this.dataflowClient = dataflowClient;
+    this.dataflowPipelineJob = dataflowPipelineJob;
+  }
+
+  /**
+   * Build an immutable map that serves as a hash key for a metric update.
+   * @return a {@link MetricKey} that can be hashed and used to identify a metric.
+   */
+  private MetricKey metricHashKey(
+      com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+    String fullStepName = metricUpdate.getName().getContext().get("step");
+    fullStepName = (dataflowPipelineJob.aggregatorTransforms != null
+        ? dataflowPipelineJob.aggregatorTransforms
+            .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName);
+    return MetricKey.create(
+        fullStepName,
+        MetricName.named(
+            metricUpdate.getName().getContext().get("namespace"),
+            metricUpdate.getName().getName()));
+  }
+
+  /**
+   * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
+   * update or not.
+   * @return true if update is tentative, false otherwise
+   */
+  private boolean isMetricTentative(
+      com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+    return (metricUpdate.getName().getContext().containsKey("tentative")
+        && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
+  }
+
+  /**
+   * Take a list of metric updates coming from the Dataflow service, and format it into a
+   * Metrics API MetricQueryResults instance.
+   * @param metricUpdates
+   * @return a populated MetricQueryResults object.
+   */
+  private MetricQueryResults populateMetricQueryResults(
+      List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+      MetricsFilter filter) {
+    // Separate metric updates by name and by tentative/committed.
+    HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+        tentativeByName = new HashMap<>();
+    HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+        committedByName = new HashMap<>();
+    HashSet<MetricKey> metricHashKeys = new HashSet<>();
+
+    // If the Context of the metric update does not have a namespace, then these are not
+    // actual metrics counters.
+    for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
+      if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update)
+          && update.getName().getContext().containsKey("namespace")) {
+        tentativeByName.put(metricHashKey(update), update);
+        metricHashKeys.add(metricHashKey(update));
+      } else if (Objects.equal(update.getName().getOrigin(), "user")
+          && update.getName().getContext().containsKey("namespace")
+          && !isMetricTentative(update)) {
+        committedByName.put(metricHashKey(update), update);
+        metricHashKeys.add(metricHashKey(update));
+      }
+    }
+    // Create the lists with the metric result information.
+    ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
+    ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults
=
+        ImmutableList.builder();
+    for (MetricKey metricKey : metricHashKeys) {
+      String metricName = metricKey.metricName().name();
+      if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
+          || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
+        // Skip distribution metrics, as these are not yet properly supported.
+        LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
+            + "User Interface");
+        continue;
+      }
+      String namespace = metricKey.metricName().namespace();
+      String step = metricKey.stepName();
+      Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
+      Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
+      if (MetricFiltering.matches(filter, metricKey)) {
+        counterResults.add(DataflowMetricResult.create(
+            MetricName.named(namespace, metricName),
+            step, committed, attempted));
+      }
+    }
+    return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build());
+  }
+
+  private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
+    List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
+    ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
+    ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
+    JobMetrics jobMetrics;
+    try {
+      jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId);
+    } catch (IOException e) {
+      LOG.warn("Unable to query job metrics.\n");
+      return DataflowMetricQueryResults.create(counters, distributions);
+    }
+    metricUpdates = jobMetrics.getMetrics();
+    return populateMetricQueryResults(metricUpdates, filter);
+  }
+
+  public MetricQueryResults queryMetrics() {
+    return queryMetrics(null);
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    if (cachedMetricResults != null) {
+      // Metric results have been cached after the job ran.
+      return cachedMetricResults;
+    }
+    MetricQueryResults result = queryServiceForMetrics(filter);
+    if (dataflowPipelineJob.getState().isTerminal()) {
+      // Add current query result to the cache.
+      cachedMetricResults = result;
+    }
+    return result;
+  }
+
+  @AutoValue
+  abstract static class DataflowMetricQueryResults implements MetricQueryResults {
+    public static MetricQueryResults create(
+        Iterable<MetricResult<Long>> counters,
+        Iterable<MetricResult<DistributionResult>> distributions) {
+      return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions);
+    }
+  }
+
+  @AutoValue
+  abstract static class DataflowMetricResult<T> implements MetricResult<T> {
+    // need to define these here so they appear in the correct order
+    // and the generated constructor is usable and consistent
+    public abstract MetricName name();
+    public abstract String step();
+    public abstract T committed();
+    public abstract T attempted();
+
+    public static <T> MetricResult<T> create(MetricName name, String scope,
+        T committed, T attempted) {
+      return new AutoValue_DataflowMetrics_DataflowMetricResult<T>(
+          name, scope, committed, attempted);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 950a9d3..1112fbb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -57,7 +57,7 @@ public class DataflowPipelineJob implements PipelineResult {
   /**
    * The id for the job.
    */
-  private String jobId;
+  protected String jobId;
 
   /**
    * The {@link DataflowPipelineOptions} for the job.
@@ -71,6 +71,12 @@ public class DataflowPipelineJob implements PipelineResult {
   private final DataflowClient dataflowClient;
 
   /**
+   * MetricResults object for Dataflow Runner. It allows for querying of metrics from the
Dataflow
+   * service.
+   */
+  private final DataflowMetrics dataflowMetrics;
+
+  /**
    * The state the job terminated in or {@code null} if the job has not terminated.
    */
   @Nullable
@@ -82,7 +88,7 @@ public class DataflowPipelineJob implements PipelineResult {
   @Nullable
   private DataflowPipelineJob replacedByJob = null;
 
-  private DataflowAggregatorTransforms aggregatorTransforms;
+  protected DataflowAggregatorTransforms aggregatorTransforms;
 
   /**
    * The Metric Updates retrieved after the job was in a terminal state.
@@ -129,6 +135,7 @@ public class DataflowPipelineJob implements PipelineResult {
     this.dataflowOptions = dataflowOptions;
     this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
     this.aggregatorTransforms = aggregatorTransforms;
+    this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
   }
 
   /**
@@ -462,8 +469,7 @@ public class DataflowPipelineJob implements PipelineResult {
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException(
-        "The DataflowRunner does not currently support metrics.");
+    return dataflowMetrics;
   }
 
   private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT>
aggregator)

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
new file mode 100644
index 0000000..1017978
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DataflowMetrics}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowMetricsTest {
+  private static final String PROJECT_ID = "some-project";
+  private static final String JOB_ID = "1234";
+  private static final String REGION_ID = "some-region";
+  private static final String REPLACEMENT_JOB_ID = "4321";
+
+  @Mock
+  private Dataflow mockWorkflowClient;
+  @Mock
+  private Dataflow.Projects mockProjects;
+  @Mock
+  private Dataflow.Projects.Locations mockLocations;
+  @Mock
+  private Dataflow.Projects.Locations.Jobs mockJobs;
+
+  private TestDataflowPipelineOptions options;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(mockWorkflowClient.projects()).thenReturn(mockProjects);
+    when(mockProjects.locations()).thenReturn(mockLocations);
+    when(mockLocations.jobs()).thenReturn(mockJobs);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setDataflowClient(mockWorkflowClient);
+    options.setProject(PROJECT_ID);
+    options.setRunner(DataflowRunner.class);
+    options.setTempLocation("gs://fakebucket/temp");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setGcpCredential(new TestCredential());
+  }
+
+  @Test
+  public void testEmptyMetricUpdates() throws IOException {
+    Job modelJob = new Job();
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics();
+    assertThat(ImmutableList.copyOf(result.counters()), is(empty()));
+    assertThat(ImmutableList.copyOf(result.distributions()), is(empty()));
+  }
+
+  @Test
+  public void testCachingMetricUpdates() throws IOException {
+    Job modelJob = new Job();
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    when(job.getState()).thenReturn(State.DONE);
+    job.jobId = JOB_ID;
+
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    verify(dataflowClient, times(0)).getJobMetrics(JOB_ID);
+    dataflowMetrics.queryMetrics(null);
+    verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
+    dataflowMetrics.queryMetrics(null);
+    verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
+  }
+
+  private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
+      long scalar, boolean tentative) {
+    MetricUpdate update = new MetricUpdate();
+    update.setScalar(new BigDecimal(scalar));
+
+    MetricStructuredName structuredName = new MetricStructuredName();
+    structuredName.setName(name);
+    structuredName.setOrigin("user");
+    ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<String, String>();
+    contextBuilder.put("step", step)
+        .put("namespace", namespace);
+    if (tentative) {
+      contextBuilder.put("tentative", "true");
+    }
+    structuredName.setContext(contextBuilder.build());
+    update.setName(structuredName);
+    return update;
+  }
+
+  @Test
+  public void testSingleCounterUpdates() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    MetricUpdate update = new MetricUpdate();
+    long stepValue = 1234L;
+    update.setScalar(new BigDecimal(stepValue));
+
+    // The parser relies on the fact that one tentative and one committed metric update exist
in
+    // the job metrics results.
+    MetricUpdate mu1 = makeCounterMetricUpdate("counterName", "counterNamespace",
+        "s2", 1234L, false);
+    MetricUpdate mu1Tentative = makeCounterMetricUpdate("counterName",
+        "counterNamespace", "s2", 1233L, true);
+    jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.counters(), containsInAnyOrder(
+        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+    assertThat(result.counters(), containsInAnyOrder(
+        committedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+  }
+
+  @Test
+  public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    // The parser relies on the fact that one tentative and one committed metric update exist
in
+    // the job metrics results.
+    jobMetrics.setMetrics(ImmutableList.of(
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, false),
+        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, true)));
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.counters(), containsInAnyOrder(
+        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+    assertThat(result.counters(), containsInAnyOrder(
+        committedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+  }
+
+  @Test
+  public void testMultipleCounterUpdates() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    // The parser relies on the fact that one tentative and one committed metric update exist
in
+    // the job metrics results.
+    jobMetrics.setMetrics(ImmutableList.of(
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+        makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
+        makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
+        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
+        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.counters(), containsInAnyOrder(
+        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L),
+        attemptedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
+        attemptedMetricsResult("otherNamespace", "counterName", "s4", 1233L)));
+    assertThat(result.counters(), containsInAnyOrder(
+        committedMetricsResult("counterNamespace", "counterName", "s2", 1233L),
+        committedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
+        committedMetricsResult("otherNamespace", "counterName", "s4", 1200L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
new file mode 100644
index 0000000..a3e43e1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Objects;
+import java.util.Set;
+
+/**
+ * Implements matching for metrics filters. Specifically, matching for metric name,
+ * namespace, and step name.
+ */
+public class MetricFiltering {
+
+  private MetricFiltering() { }
+
+  /** Matching logic is implemented here rather than in MetricsFilter because we would like
+   *  MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it
with
+   *  a Proto/JSON/etc. schema object.
+   * @param filter {@link MetricsFilter} with the matching information of an actual metric
+   * @param key {@link MetricKey} with the information of a metric
+   * @return whether the filter matches the key or not
+   */
+  public static boolean matches(MetricsFilter filter, MetricKey key) {
+    return filter == null
+        || (matchesName(key.metricName(), filter.names())
+        && matchesScope(key.stepName(), filter.steps()));
+  }
+
+  /**
+   * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+   * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
+   * but not "a/fool/bar/b" or "a/foo/bart/b".
+   */
+  public static boolean subPathMatches(String haystack, String needle) {
+    int location = haystack.indexOf(needle);
+    int end = location + needle.length();
+    if (location == -1) {
+      return false;  // needle not found
+    } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+      return false; // the first entry in needle wasn't exactly matched
+    } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+      return false; // the last entry in needle wasn't exactly matched
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
+   * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
+   * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
+   * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
+  public static boolean matchesScope(String actualScope, Set<String> scopes) {
+    if (scopes.isEmpty() || scopes.contains(actualScope)) {
+      return true;
+    }
+
+    // If there is no perfect match, a stage name-level match is tried.
+    // This is done by a substring search over the levels of the scope.
+    // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
+    for (String scope : scopes) {
+      if (subPathMatches(actualScope, scope)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private static boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters)
{
+    if (nameFilters.isEmpty()) {
+      return true;
+    }
+    for (MetricNameFilter nameFilter : nameFilters) {
+      if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
+          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
new file mode 100644
index 0000000..3e6a499
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricFiltering}.
+ */
+@RunWith(JUnit4.class)
+public class MetricFilteringTest {
+  private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+
+
+  private boolean matchesSubPath(String actualScope, String subPath) {
+    return MetricFiltering.subPathMatches(actualScope, subPath);
+  }
+
+  @Test
+  public void testMatchesSubPath() {
+    assertTrue("Match of the first element",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue("Match of the first elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue("Match of the last elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+    assertFalse("Substring match but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+    assertFalse("Substring match from start - but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+  }
+
+  private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
+    Set<String> scopeFilter = new HashSet<String>();
+    scopeFilter.add(filter);
+    return MetricFiltering.matchesScope(actualScope, scopeFilter);
+  }
+
+  @Test
+  public void testMatchesScope() {
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue(matchesScopeWithSingleFilter(
+        "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
+  }
+}


Mime
View raw message