beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Adding setters for CounterUpdate in the dataflow runner
Date Thu, 16 Feb 2017 19:38:12 GMT
Repository: beam
Updated Branches:
  refs/heads/master cd4e6e402 -> 55340e617


Adding setters for CounterUpdate in the dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d208184
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d208184
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d208184

Branch: refs/heads/master
Commit: 5d208184c4704124ce731506984eb40afb7efee9
Parents: cd4e6e4
Author: Pablo <pabloem@google.com>
Authored: Fri Feb 10 10:30:39 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Thu Feb 16 11:23:39 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 141 +++++++++++++++----
 .../apache_beam/internal/apiclient_test.py      |  49 ++++++-
 2 files changed, 161 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d208184/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 3567430..71b670f 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -24,15 +24,13 @@ import json
 import logging
 import os
 import re
-import time
-
 from StringIO import StringIO
-
-from apitools.base.py import encoding
-from apitools.base.py import exceptions
+import time
 
 from apache_beam import utils
 from apache_beam.internal.auth import get_service_credentials
+from apache_beam.internal.clients import dataflow
+from apache_beam.internal.clients import storage
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.transforms import cy_combiners
 from apache_beam.transforms.display import DisplayData
@@ -45,8 +43,9 @@ from apache_beam.utils.pipeline_options import DebugOptions
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
 from apache_beam.utils.pipeline_options import StandardOptions
 from apache_beam.utils.pipeline_options import WorkerOptions
-from apache_beam.internal.clients import storage
-import apache_beam.internal.clients.dataflow as dataflow
+
+from apitools.base.py import encoding
+from apitools.base.py import exceptions
 
 
 class Step(object):
@@ -81,8 +80,7 @@ class Step(object):
     return outputs
 
   def __reduce__(self):
-    """Reduce hook for pickling the Step class more easily
-    """
+    """Reduce hook for pickling the Step class more easily."""
     return (Step, (self.step_kind, self.step_name, self._additional_properties))
 
   def get_output(self, tag=None):
@@ -341,8 +339,7 @@ class Job(object):
     return encoding.MessageToJson(self.proto)
 
   def __reduce__(self):
-    """Reduce hook for pickling the Job class more easily
-    """
+    """Reduce hook for pickling the Job class more easily."""
     return (Job, (self.options,))
 
 
@@ -410,9 +407,7 @@ class DataflowApplicationClient(object):
   # TODO(silviuc): Refactor so that retry logic can be applied.
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
-    """Creates a job description.
-    Additionally, it may stage it and/or submit it for remote execution.
-    """
+    """Creates job description. May stage and/or submit for remote execution."""
     self.create_job_description(job)
 
     # Stage and submit the job when necessary
@@ -609,11 +604,67 @@ class DataflowApplicationClient(object):
     return response.jobMessages, response.nextPageToken
 
 
-def set_scalar(accumulator, metric_update):
+class MetricUpdateTranslators(object):
+  """Translators between accumulators and dataflow metric updates."""
+
+  @staticmethod
+  def translate_boolean(accumulator, metric_update_proto):
+    metric_update_proto.boolean = accumulator.value
+
+  @staticmethod
+  def translate_scalar_mean_int(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.integerMean = dataflow.IntegerMean()
+      metric_update_proto.integerMean.sum = to_split_int(accumulator.sum)
+      metric_update_proto.integerMean.count = to_split_int(accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_mean_float(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.floatingPointMean = dataflow.FloatingPointMean()
+      metric_update_proto.floatingPointMean.sum = accumulator.sum
+      metric_update_proto.floatingPointMean.count = to_split_int(
+          accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_counter_int(accumulator, metric_update_proto):
+    metric_update_proto.integer = to_split_int(accumulator.value)
+
+  @staticmethod
+  def translate_scalar_counter_float(accumulator, metric_update_proto):
+    metric_update_proto.floatingPoint = accumulator.value
+
+
+def to_split_int(n):
+  res = dataflow.SplitInt64()
+  res.lowBits = n & 0xffffffff
+  res.highBits = n >> 32
+  return res
+
+
+def translate_distribution(distribution_update, metric_update_proto):
+  """Translate metrics DistributionUpdate to dataflow distribution update."""
+  dist_update_proto = dataflow.DistributionUpdate()
+  dist_update_proto.min = to_split_int(distribution_update.min)
+  dist_update_proto.max = to_split_int(distribution_update.max)
+  dist_update_proto.count = to_split_int(distribution_update.count)
+  dist_update_proto.sum = to_split_int(distribution_update.sum)
+  metric_update_proto.distribution = dist_update_proto
+
+
+def translate_value(value, metric_update_proto):
+  metric_update_proto.integer = to_split_int(value)
+
+
+def translate_scalar(accumulator, metric_update):
   metric_update.scalar = to_json_value(accumulator.value, with_type=True)
 
 
-def set_mean(accumulator, metric_update):
+def translate_mean(accumulator, metric_update):
   if accumulator.count:
     metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
     metric_update.meanCount = to_json_value(accumulator.count, with_type=True)
@@ -625,15 +676,51 @@ def set_mean(accumulator, metric_update):
 
 # To enable a counter on the service, add it to this dictionary.
 metric_translations = {
-    cy_combiners.CountCombineFn: ('sum', set_scalar),
-    cy_combiners.SumInt64Fn: ('sum', set_scalar),
-    cy_combiners.MinInt64Fn: ('min', set_scalar),
-    cy_combiners.MaxInt64Fn: ('max', set_scalar),
-    cy_combiners.MeanInt64Fn: ('mean', set_mean),
-    cy_combiners.SumFloatFn: ('sum', set_scalar),
-    cy_combiners.MinFloatFn: ('min', set_scalar),
-    cy_combiners.MaxFloatFn: ('max', set_scalar),
-    cy_combiners.MeanFloatFn: ('mean', set_mean),
-    cy_combiners.AllCombineFn: ('and', set_scalar),
-    cy_combiners.AnyCombineFn: ('or', set_scalar),
+    cy_combiners.CountCombineFn: ('sum', translate_scalar),
+    cy_combiners.SumInt64Fn: ('sum', translate_scalar),
+    cy_combiners.MinInt64Fn: ('min', translate_scalar),
+    cy_combiners.MaxInt64Fn: ('max', translate_scalar),
+    cy_combiners.MeanInt64Fn: ('mean', translate_mean),
+    cy_combiners.SumFloatFn: ('sum', translate_scalar),
+    cy_combiners.MinFloatFn: ('min', translate_scalar),
+    cy_combiners.MaxFloatFn: ('max', translate_scalar),
+    cy_combiners.MeanFloatFn: ('mean', translate_mean),
+    cy_combiners.AllCombineFn: ('and', translate_scalar),
+    cy_combiners.AnyCombineFn: ('or', translate_scalar),
+}
+
+counter_translations = {
+    cy_combiners.CountCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.SumInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MinInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MaxInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MeanInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_int),
+    cy_combiners.SumFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MinFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MaxFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MeanFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_float),
+    cy_combiners.AllCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.AND,
+        MetricUpdateTranslators.translate_boolean),
+    cy_combiners.AnyCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.OR,
+        MetricUpdateTranslators.translate_boolean),
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5d208184/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 188a5a8..c1d304b 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -18,9 +18,13 @@
 
 import unittest
 
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.runners.dataflow_runner import DataflowRunner
+from mock import Mock
+
 from apache_beam.internal import apiclient
+from apache_beam.internal.clients import dataflow
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.runners.dataflow_runner import DataflowRunner
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class UtilTest(unittest.TestCase):
@@ -37,6 +41,47 @@ class UtilTest(unittest.TestCase):
     regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}'
     self.assertRegexpMatches(job_name, regexp)
 
+  def test_split_int(self):
+    number = 12345
+    split_number = apiclient.to_split_int(number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (number, 0))
+    shift_number = number << 32
+    split_number = apiclient.to_split_int(shift_number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (0, number))
+
+  def test_translate_distribution(self):
+    metric_update = dataflow.CounterUpdate()
+    distribution_update = DistributionData(16, 2, 1, 15)
+    apiclient.translate_distribution(distribution_update, metric_update)
+    self.assertEqual(metric_update.distribution.min.lowBits,
+                     distribution_update.min)
+    self.assertEqual(metric_update.distribution.max.lowBits,
+                     distribution_update.max)
+    self.assertEqual(metric_update.distribution.sum.lowBits,
+                     distribution_update.sum)
+    self.assertEqual(metric_update.distribution.count.lowBits,
+                     distribution_update.count)
+
+  def test_translate_means(self):
+    metric_update = dataflow.CounterUpdate()
+    accumulator = Mock()
+    accumulator.sum = 16
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
+                                                                metric_update)
+    self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
+    self.assertEqual(metric_update.integerMean.count.lowBits, accumulator.count)
+
+    accumulator.sum = 16.0
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
+                                                                  metric_update)
+    self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
+    self.assertEqual(
+        metric_update.floatingPointMean.count.lowBits, accumulator.count)
+
 
 if __name__ == '__main__':
   unittest.main()


Mime
View raw message