giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [giraph] anurag1212 commented on a change in pull request #109: Counter mechanism
Date Thu, 24 Oct 2019 22:37:18 GMT
anurag1212 commented on a change in pull request #109: Counter mechanism
URL: https://github.com/apache/giraph/pull/109#discussion_r338821278
 
 

 ##########
 File path: giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
 ##########
 @@ -1794,6 +1802,128 @@ private void doMasterCompute() {
     timerContext.stop();
   }
 
+  /**
+   * Receive the counters from the workers, and aggregate them with the
+   * master counters.
+   * Send the aggregated value to the job client via Thrift call
+   */
+  private void aggregateCountersFromWorkersAndMaster() {
+    CustomCounters customCounters = new CustomCounters();
+    // Get the stats from the all the worker selected nodes
+    String workerFinishedPath =
+            getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
+    List<String> workerFinishedPathList = null;
+    // Subtract 1 for the master
+    // TODO - what happens when there is only 1 worker?
+    int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1;
+    if (numWorkers == 0) {
+      numWorkers += 1;
+    }
+    // Get the counter values from the zookeeper, written by the workers
+    // We keep retrying until all the workers have written
+    try {
+      while (true) {
+        try {
+          workerFinishedPathList = getZkExt().getChildrenExt(
+                  workerFinishedPath, true,
+                  false, true);
+          LOG.info(String.format("Fetching counter values from " +
+                          "workers. Got %d out of %d",
+                  workerFinishedPathList.size(), numWorkers));
+          if (workerFinishedPathList.size() == numWorkers) {
+            break;
+          }
+        } catch (KeeperException e) {
+          LOG.info("Got Keeper exception, but will retry: ", e);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(
+                  "aggregateWorkerStats: InterruptedException", e);
+        }
+        Thread.sleep(1000);
+      }
+    } catch (InterruptedException ie) {
+      LOG.info("Interrupted exception");
+    }
+    for (String finishedPath : workerFinishedPathList) {
+      JSONObject workerFinishedInfoObj = null;
+      try {
+        byte [] zkData =
+                getZkExt().getData(finishedPath, false, null);
+        workerFinishedInfoObj = new JSONObject(new String(zkData,
+                Charset.defaultCharset()));
+        Map<String, Map<String, Long>> workerCounters = new HashMap<>();
+        Iterator<String> keys = workerFinishedInfoObj.keys();
+        while (keys.hasNext()) {
+          String groupName = keys.next();
+          JSONObject jsonCounter =
+                  workerFinishedInfoObj.getJSONObject(groupName);
+          Iterator<String> counterNames = jsonCounter.keys();
+          Map<String, Long> counters = new HashMap<>();
+          LOG.info(groupName);
+          while (counterNames.hasNext()) {
+            String counterName = counterNames.next();
+            Long value = jsonCounter.getLong(counterName);
+            counters.put(counterName, value);
+            LOG.info(counterName + ": " + value);
+          }
+          workerCounters.put(groupName, counters);
+        }
+        customCounters.mergeCounters(workerCounters);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+                "aggregateWorkerStats: JSONException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+                "aggregateWorkerStats: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+                "aggregateWorkerStats: InterruptedException", e);
+      }
+    }
+    // Add master counters too
+    if (numWorkers != 1) {
 
 Review comment:
   lul

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message