beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Add start_times to MonitoringInfos and populate them in the python SDK
Date Wed, 02 Dec 2020 14:51:12 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c25bc0  Add start_times to MonitoringInfos and populate them in the python SDK
     new eca935b  Merge pull request #13429 from Add start_times to MonitoringInfos and populate
them in the python SDK
6c25bc0 is described below

commit 6c25bc082e809246989ca8a459c36c5b4d247f20
Author: Alex Amato <ajamato@google.com>
AuthorDate: Wed Nov 25 12:23:45 2020 -0800

    Add start_times to MonitoringInfos and populate them in the python SDK
---
 model/pipeline/src/main/proto/metrics.proto        | 13 +++++++++++
 sdks/python/apache_beam/metrics/cells.pxd          |  2 ++
 sdks/python/apache_beam/metrics/cells.py           | 15 ++++++++++---
 sdks/python/apache_beam/metrics/cells_test.py      | 25 ++++++++++++++++++++++
 .../apache_beam/runners/worker/sdk_worker_test.py  |  2 ++
 5 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index 86114a8..39ef551 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -31,6 +31,7 @@ option java_outer_classname = "MetricsApi";
 
 import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
 
 // A specification for describing a well known MonitoringInfo.
 //
@@ -401,6 +402,18 @@ message MonitoringInfo {
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
   map<string, string> labels = 4;
+
+  // This indicates the start of the time range over which this value was
+  // measured.
+  // This is needed by some external metric aggregation services
+  // to indicate when the reporter of the metric first began collecting the
+  // cumulative value for the timeseries.
+  // If the SDK Harness restarts, it should reset the start_time, and reset
+  // the collection of cumulative metrics (i.e. start to count again from 0).
+  // HarnessMonitoringInfos should set this start_time once, when the
+  // MonitoringInfo is first reported.
+  // ProcessBundle MonitoringInfos should set a start_time for each bundle.
+  google.protobuf.Timestamp start_time = 5;
 }
 
 // A set of well known URNs that specify the encoding and aggregation method.
diff --git a/sdks/python/apache_beam/metrics/cells.pxd b/sdks/python/apache_beam/metrics/cells.pxd
index 0204da8..0eaa890 100644
--- a/sdks/python/apache_beam/metrics/cells.pxd
+++ b/sdks/python/apache_beam/metrics/cells.pxd
@@ -17,11 +17,13 @@
 
 cimport cython
 cimport libc.stdint
+from cpython.datetime cimport datetime
 
 
 cdef class MetricCell(object):
   cdef object _lock
   cpdef bint update(self, value) except -1
+  cdef datetime _start_time
 
 
 cdef class CounterCell(MetricCell):
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index a7b7938..34ce2a4 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -29,6 +29,7 @@ from __future__ import division
 import threading
 import time
 from builtins import object
+from datetime import datetime
 from typing import Any
 from typing import Optional
 from typing import SupportsInt
@@ -63,6 +64,7 @@ class MetricCell(object):
   """
   def __init__(self):
     self._lock = threading.Lock()
+    self._start_time = None
 
   def update(self, value):
     raise NotImplementedError
@@ -71,6 +73,13 @@ class MetricCell(object):
     raise NotImplementedError
 
   def to_runner_api_monitoring_info(self, name, transform_id):
+    if not self._start_time:
+      self._start_time = datetime.utcnow()
+    mi = self.to_runner_api_monitoring_info_impl(name, transform_id)
+    mi.start_time.FromDatetime(self._start_time)
+    return mi
+
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
     raise NotImplementedError
 
   def reset(self):
@@ -136,7 +145,7 @@ class CounterCell(MetricCell):
     with self._lock:
       return self.value
 
-  def to_runner_api_monitoring_info(self, name, transform_id):
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     if not name.urn:
       # User counter case.
@@ -201,7 +210,7 @@ class DistributionCell(MetricCell):
     with self._lock:
       return self.data.get_cumulative()
 
-  def to_runner_api_monitoring_info(self, name, transform_id):
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     return monitoring_infos.int64_user_distribution(
         name.namespace,
@@ -251,7 +260,7 @@ class GaugeCell(MetricCell):
     with self._lock:
       return self.data.get_cumulative()
 
-  def to_runner_api_monitoring_info(self, name, transform_id):
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
     from apache_beam.metrics import monitoring_infos
     return monitoring_infos.int64_user_gauge(
         name.namespace,
diff --git a/sdks/python/apache_beam/metrics/cells_test.py b/sdks/python/apache_beam/metrics/cells_test.py
index e9aae5c..a120d15 100644
--- a/sdks/python/apache_beam/metrics/cells_test.py
+++ b/sdks/python/apache_beam/metrics/cells_test.py
@@ -28,6 +28,7 @@ from apache_beam.metrics.cells import DistributionCell
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.metrics.cells import GaugeCell
 from apache_beam.metrics.cells import GaugeData
+from apache_beam.metrics.metricbase import MetricName
 
 
 class TestCounterCell(unittest.TestCase):
@@ -69,6 +70,14 @@ class TestCounterCell(unittest.TestCase):
     c.inc()
     self.assertEqual(c.get_cumulative(), -8)
 
+  def test_start_time_set(self):
+    c = CounterCell()
+    c.inc(2)
+
+    name = MetricName('namespace', 'name1')
+    mi = c.to_runner_api_monitoring_info(name, 'transform_id')
+    self.assertGreater(mi.start_time.seconds, 0)
+
 
 class TestDistributionCell(unittest.TestCase):
   @classmethod
@@ -119,6 +128,14 @@ class TestDistributionCell(unittest.TestCase):
     d.update(3.3)
     self.assertEqual(d.get_cumulative(), DistributionData(9, 3, 3, 3))
 
+  def test_start_time_set(self):
+    d = DistributionCell()
+    d.update(3.1)
+
+    name = MetricName('namespace', 'name1')
+    mi = d.to_runner_api_monitoring_info(name, 'transform_id')
+    self.assertGreater(mi.start_time.seconds, 0)
+
 
 class TestGaugeCell(unittest.TestCase):
   def test_basic_operations(self):
@@ -146,6 +163,14 @@ class TestGaugeCell(unittest.TestCase):
     result = g2.combine(g1)
     self.assertEqual(result.data.value, 1)
 
+  def test_start_time_set(self):
+    g1 = GaugeCell()
+    g1.set(3)
+
+    name = MetricName('namespace', 'name1')
+    mi = g1.to_runner_api_monitoring_info(name, 'transform_id')
+    self.assertGreater(mi.start_time.seconds, 0)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c1cf641..7fa290f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -263,6 +263,8 @@ class SdkWorkerTest(unittest.TestCase):
         responses['monitoring_infos_metadata'].monitoring_infos.monitoring_info)
     found = False
     for mi in short_id_to_mi.values():
+      # Clear the timestamp before comparing
+      mi.ClearField("start_time")
       if mi == expected_monitoring_info:
         found = True
     self.assertTrue(found, str(responses['monitoring_infos_metadata']))


Mime
View raw message