beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4778) Less wasteful ArtifactStagingService
Date Thu, 02 Aug 2018 03:52:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=130085&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130085
]

ASF GitHub Bot logged work on BEAM-4778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/18 03:51
            Start Date: 02/Aug/18 03:51
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #5958: [BEAM-4778] add
option to flink job server to clean staged artifacts per-job
URL: https://github.com/apache/beam/pull/5958#discussion_r207094725
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##########
 @@ -138,56 +140,57 @@ public static String generateStagingSessionToken(String sessionId,
String basePa
     StagingSessionToken stagingSessionToken = new StagingSessionToken();
     stagingSessionToken.setSessionId(sessionId);
     stagingSessionToken.setBasePath(basePath);
-    return encodeStagingSessionToken(stagingSessionToken);
+    return stagingSessionToken.encode();
   }
 
   private String encodedFileName(ArtifactMetadata artifactMetadata) {
     return "artifact_"
         + Hashing.sha256().hashString(artifactMetadata.getName(), CHARSET).toString();
   }
 
-  private static StagingSessionToken decodeStagingSessionToken(String stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
-    } catch (JsonProcessingException e) {
-      String message =
-          String.format(
-              "Unable to deserialize staging token %s. Expected format: %s. Error: %s",
-              stagingSessionToken,
-              "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
-              e.getMessage());
-      throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
-    }
-  }
+  public void removeArtifacts(String stagingSessionToken) throws Exception {
+    StagingSessionToken parsedToken = StagingSessionToken.decode(stagingSessionToken);
+    ResourceId dir = getJobDirResourceId(parsedToken);
+    ResourceId manifestResourceId = dir.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);
 
-  private static String encodeStagingSessionToken(StagingSessionToken stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.writeValueAsString(stagingSessionToken);
-    } catch (JsonProcessingException e) {
-      LOG.error("Error {} occurred while serializing {}.", e.getMessage(), stagingSessionToken);
-      throw e;
+    LOG.info("Removing dir {}", dir);
 
 Review comment:
   Do we really this and following logging at info level? Why not debug?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 130085)
    Time Spent: 5.5h  (was: 5h 20m)

> Less wasteful ArtifactStagingService
> ------------------------------------
>
>                 Key: BEAM-4778
>                 URL: https://issues.apache.org/jira/browse/BEAM-4778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java] is
the main implementation of ArtifactStagingService.
> It stages artifacts into a directory; and in practice the passed staging session token
is such that the directory is different for every job. This leads to 2 issues:
>  * It doesn't get cleaned up when the job finishes or even when the JobService shuts
down, so we have disk space leaks if running a lot of jobs (e.g. a suite of ValidatesRunner
tests)
>  * We repeatedly re-stage the same artifacts. Instead, ideally, we should identify that
some artifacts don't need to be staged - based on knowing their md5. The artifact staging
protocol has rudimentary support for this but may need to be modified.
> CC: [~angoenka]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message