beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3389) Clean receive queue in dataplane
Date Tue, 02 Jan 2018 20:18:02 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308645#comment-16308645
] 

ASF GitHub Bot commented on BEAM-3389:
--------------------------------------

robertwb closed pull request #4315: [BEAM-3389] Clean data receive queue
URL: https://github.com/apache/beam/pull/4315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index b3d4854d897..f18ab3e4df4 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -161,21 +161,38 @@ def _receiving_queue(self, instruction_id):
     with self._receive_lock:
       return self._received[instruction_id]
 
+  def _clean_receiving_queue(self, instruction_id):
+    with self._receive_lock:
+      self._received.pop(instruction_id)
+
   def input_elements(self, instruction_id, expected_targets):
+    """
+    Generator to retrieve elements for an instruction_id
+    input_elements should be called only once for an instruction_id
+
+    Args:
+      instruction_id(str): instruction_id for which data is read
+      expected_targets(collection): expected targets
+    """
     received = self._receiving_queue(instruction_id)
     done_targets = []
-    while len(done_targets) < len(expected_targets):
-      try:
-        data = received.get(timeout=1)
-      except queue.Empty:
-        if self._exc_info:
-          raise exc_info[0], exc_info[1], exc_info[2]
-      else:
-        if not data.data and data.target in expected_targets:
-          done_targets.append(data.target)
+    try:
+      while len(done_targets) < len(expected_targets):
+        try:
+          data = received.get(timeout=1)
+        except queue.Empty:
+          if self._exc_info:
+            raise exc_info[0], exc_info[1], exc_info[2]
         else:
-          assert data.target not in done_targets
-          yield data
+          if not data.data and data.target in expected_targets:
+            done_targets.append(data.target)
+          else:
+            assert data.target not in done_targets
+            yield data
+    finally:
+      # Instruction_ids are not reusable so Clean queue once we are done with
+      #  an instruction_id
+      self._clean_receiving_queue(instruction_id)
 
   def output_stream(self, instruction_id, target):
     # TODO: Return an output stream that sends data


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Clean receive queue in dataplane
> --------------------------------
>
>                 Key: BEAM-3389
>                 URL: https://issues.apache.org/jira/browse/BEAM-3389
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>
> Remove the receiving queue for an instruction_id after an instruction_id is processed



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message