storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [storm] branch master updated: STORM-3396 Fix uploading dependency jars too slow when uploading fat jars betweeeen different IDC.
Date Tue, 28 May 2019 17:16:31 GMT
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e50006  STORM-3396 Fix uploading dependency jars too slow when uploading fat jars
betweeeen different IDC.
     new cc5f3c4  Merge pull request #3013 from StaticMian/fix_upload_dependencies_too_slowly
9e50006 is described below

commit 9e50006020e9cb033a589704a55c0215e75ed5ad
Author: lihonglin05 <lihonglin05@meituan.com>
AuthorDate: Sat May 25 20:56:00 2019 +0800

    STORM-3396 Fix uploading dependency jars too slow when uploading fat jars betweeeen different
IDC.
---
 conf/defaults.yaml                                             |  1 +
 storm-client/src/jvm/org/apache/storm/Config.java              |  6 ++++++
 .../jvm/org/apache/storm/dependency/DependencyUploader.java    | 10 +++++++++-
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f45169e..0051a8d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -144,6 +144,7 @@ nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
 nimbus.blobstore.expiration.secs: 600
 
 storm.blobstore.inputstream.buffer.size.bytes: 65536
+storm.blobstore.dependency.jar.upload.chuck.size.bytes: 1048576
 client.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
 storm.blobstore.replication.factor: 3
 # For secure mode we would want to change this config to true
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7c8d19e..10a52b8 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1411,6 +1411,12 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     public static final String STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES = "storm.blobstore.inputstream.buffer.size.bytes";
     /**
+     * What chuck size to use for storm client to upload dependency jars.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String STORM_BLOBSTORE_DEPENDENCY_JAR_UPLOAD_CHUCK_SIZE_BYTES = "storm.blobstore.dependency.jar.upload.chuck.size.bytes";
+    /**
      * FQCN of a class that implements {@code ISubmitterHook} @see ISubmitterHook for details.
      */
     @isString
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
index d8f8c5a..1d74d88 100644
--- a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -20,11 +20,13 @@ package org.apache.storm.dependency;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.Config;
 import org.apache.storm.blobstore.AtomicOutputStream;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.blobstore.ClientBlobStore;
@@ -35,18 +37,22 @@ import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
 
 public class DependencyUploader {
     public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
 
     private final Map<String, Object> conf;
     private ClientBlobStore blobStore;
+    private int uploadChuckSize;
 
     public DependencyUploader() {
         conf = Utils.readStormConfig();
+        this.uploadChuckSize = ObjectReader.getInt(conf.get(Config.STORM_BLOBSTORE_DEPENDENCY_JAR_UPLOAD_CHUCK_SIZE_BYTES),
1024 * 1024);
     }
 
     public void init() {
@@ -157,7 +163,9 @@ public class DependencyUploader {
             AtomicOutputStream blob = null;
             try {
                 blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls));
-                Files.copy(dependency.toPath(), blob);
+                try(InputStream in = Files.newInputStream(dependency.toPath())) {
+                    IOUtils.copy(in, blob, this.uploadChuckSize);
+                }
                 blob.close();
                 blob = null;
 


Mime
View raw message