beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [beam] branch master updated: [BEAM-6944] Add MeanByteCountMonitoringInfoToCounterUpdateTransformer (#8171)
Date Mon, 01 Apr 2019 20:10:28 GMT
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e68fafa  [BEAM-6944] Add MeanByteCountMonitoringInfoToCounterUpdateTransformer (#8171)
e68fafa is described below

commit e68fafa533439e1d9c039bd4dc720d1dbbdad87a
Author: Mikhail Gryzykhin <12602502+Ardagan@users.noreply.github.com>
AuthorDate: Mon Apr 1 13:10:12 2019 -0700

    [BEAM-6944] Add MeanByteCountMonitoringInfoToCounterUpdateTransformer (#8171)
---
 ...piMonitoringInfoToCounterUpdateTransformer.java |   4 +
 ...ntMonitoringInfoToCounterUpdateTransformer.java | 124 +++++++++++++++++++++
 ...nitoringInfoToCounterUpdateTransformerTest.java | 117 +++++++++++++++++++
 3 files changed, 245 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 478a7a3..49058b6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -58,6 +58,10 @@ public class FnApiMonitoringInfoToCounterUpdateTransformer
         ElementCountMonitoringInfoToCounterUpdateTransformer.getSupportedUrn(),
         new ElementCountMonitoringInfoToCounterUpdateTransformer(
             specValidator, sdkPCollectionIdToNameContext));
+    this.counterTransformers.put(
+        MeanByteCountMonitoringInfoToCounterUpdateTransformer.getSupportedUrn(),
+        new MeanByteCountMonitoringInfoToCounterUpdateTransformer(
+            specValidator, sdkPCollectionIdToNameContext));
   }
 
   /** Allows for injection of user and generic counter transformers for more convenient testing.
*/
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformer.java
new file mode 100644
index 0000000..57688d9
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import com.google.api.services.dataflow.model.IntegerMean;
+import com.google.api.services.dataflow.model.NameAndKind;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** MonitoringInfo to CounterUpdate transformer capable to transform MeanByteCount counter.
*/
+public class MeanByteCountMonitoringInfoToCounterUpdateTransformer
+    implements MonitoringInfoToCounterUpdateTransformer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class);
+
+  private final SpecMonitoringInfoValidator specValidator;
+  private final Map<String, NameContext> pcollectionIdToNameContext;
+
+  // TODO(BEAM-6945): utilize value from metrics.proto once it gets in.
+  private static final String SUPPORTED_URN = "beam:metric:sampled_byte_size:v1";
+
+  /**
+   * @param specValidator SpecMonitoringInfoValidator to utilize for default validation.
+   * @param pcollectionIdToNameContext This mapping is utilized to generate DFE CounterUpdate
name.
+   */
+  public MeanByteCountMonitoringInfoToCounterUpdateTransformer(
+      SpecMonitoringInfoValidator specValidator,
+      Map<String, NameContext> pcollectionIdToNameContext) {
+    this.specValidator = specValidator;
+    this.pcollectionIdToNameContext = pcollectionIdToNameContext;
+  }
+
+  /**
+   * Validates provided monitoring info against specs and common safety checks.
+   *
+   * @param monitoringInfo to validate.
+   * @return Optional.empty() all validation checks are passed. Optional with error text
otherwise.
+   * @throws RuntimeException if received unexpected urn.
+   */
+  protected Optional<String> validate(MonitoringInfo monitoringInfo) {
+    Optional<String> validatorResult = specValidator.validate(monitoringInfo);
+    if (validatorResult.isPresent()) {
+      return validatorResult;
+    }
+
+    String urn = monitoringInfo.getUrn();
+    if (!urn.equals(SUPPORTED_URN)) {
+      throw new RuntimeException(String.format("Received unexpected counter urn: %s", urn));
+    }
+
+    // TODO(BEAM-6945): extract and utilize pcollection label from beam_fn_api.proto
+    if (!pcollectionIdToNameContext.containsKey(monitoringInfo.getLabelsMap().get("PCOLLECTION")))
{
+      return Optional.of(
+          "Encountered ElementCount MonitoringInfo with unknown PCollectionId: "
+              + monitoringInfo.toString());
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Generates CounterUpdate to send to DFE based on ElementCount MonitoringInfo.
+   *
+   * @param monitoringInfo Monitoring info to transform.
+   * @return CounterUpdate generated based on provided monitoringInfo
+   */
+  @Override
+  @Nullable
+  public CounterUpdate transform(MonitoringInfo monitoringInfo) {
+    Optional<String> validationResult = validate(monitoringInfo);
+    if (validationResult.isPresent()) {
+      LOG.info(validationResult.get());
+      return null;
+    }
+
+    IntDistributionData value =
+        monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
+
+    final String pcollectionId = monitoringInfo.getLabelsMap().get("PCOLLECTION");
+    final String pcollectionName = pcollectionIdToNameContext.get(pcollectionId).userName();
+
+    String counterName = pcollectionName + "-MeanByteCount";
+    NameAndKind name = new NameAndKind();
+    name.setName(counterName).setKind("MEAN");
+
+    return new CounterUpdate()
+        .setNameAndKind(name)
+        .setCumulative(true)
+        .setIntegerMean(
+            new IntegerMean()
+                .setSum(longToSplitInt(value.getSum()))
+                .setCount(longToSplitInt(value.getCount())));
+  }
+
+  /** @return iterable of Urns that this transformer can convert to CounterUpdates. */
+  public static String getSupportedUrn() {
+    return SUPPORTED_URN;
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformerTest.java
new file mode 100644
index 0000000..62ccae4
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MeanByteCountMonitoringInfoToCounterUpdateTransformerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.CounterUpdate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class MeanByteCountMonitoringInfoToCounterUpdateTransformerTest {
+
+  @Rule public final ExpectedException exception = ExpectedException.none();
+
+  @Mock private SpecMonitoringInfoValidator mockSpecValidator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void tesTransformReturnsNullIfSpecValidationFails() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    ElementCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new ElementCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    Optional<String> error = Optional.of("Error text");
+    when(mockSpecValidator.validate(any())).thenReturn(error);
+    assertEquals(null, testObject.transform(null));
+  }
+
+  @Test
+  public void testTransformThrowsIfMonitoringInfoWithWrongUrnReceived() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder().setUrn("beam:user:metric:element_count:v1").build();
+    MeanByteCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new MeanByteCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+    exception.expect(RuntimeException.class);
+    testObject.transform(monitoringInfo);
+  }
+
+  @Test
+  public void testTransformReturnsNullIfMonitoringInfoWithUnknownPCollectionLabelPresent()
{
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder()
+            .setUrn("beam:metric:sampled_byte_size:v1")
+            .putLabels("PCOLLECTION", "anyValue")
+            .build();
+    MeanByteCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new MeanByteCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+    assertEquals(null, testObject.transform(monitoringInfo));
+  }
+
+  @Test
+  public void testTransformReturnsValidCounterUpdateWhenValidMonitoringInfoReceived() {
+    Map<String, NameContext> pcollectionNameMapping = new HashMap<>();
+    pcollectionNameMapping.put(
+        "anyValue",
+        NameContext.create("anyStageName", "anyOriginName", "anySystemName", "transformedValue"));
+
+    MonitoringInfo monitoringInfo =
+        MonitoringInfo.newBuilder()
+            .setUrn("beam:metric:sampled_byte_size:v1")
+            .putLabels("PCOLLECTION", "anyValue")
+            .build();
+    MeanByteCountMonitoringInfoToCounterUpdateTransformer testObject =
+        new MeanByteCountMonitoringInfoToCounterUpdateTransformer(
+            mockSpecValidator, pcollectionNameMapping);
+    when(mockSpecValidator.validate(any())).thenReturn(Optional.empty());
+
+    CounterUpdate result = testObject.transform(monitoringInfo);
+
+    assertNotEquals(null, result);
+    assertEquals(
+        "{cumulative=true, integerMean={count={highBits=0, lowBits=0}, "
+            + "sum={highBits=0, lowBits=0}}, "
+            + "nameAndKind={kind=MEAN, "
+            + "name=transformedValue-MeanByteCount}}",
+        result.toString());
+  }
+}


Mime
View raw message