nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trkurc <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1107 - Integrate Multipart uploads into th...
Date Thu, 26 Nov 2015 04:47:13 GMT
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/132#discussion_r45944159
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
---
    @@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws IOException
{
                                 objectMetadata.setUserMetadata(userMetadata);
                             }
     
    -                        final PutObjectRequest request = new PutObjectRequest(bucket,
key, in, objectMetadata);
    -                        request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    -                        final AccessControlList acl = createACL(context, ff);
    -                        if (acl != null) {
    -                            request.setAccessControlList(acl);
    -                        }
    +                        if (ff.getSize() <= multipartThreshold) {
    +                            //----------------------------------------
    +                            // single part upload
    +                            //----------------------------------------
    +                            final PutObjectRequest request = new PutObjectRequest(bucket,
key, in, objectMetadata);
    +                            request.setStorageClass(
    +                                    StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                            final AccessControlList acl = createACL(context, ff);
    +                            if (acl != null) {
    +                                request.setAccessControlList(acl);
    +                            }
     
    -                        final PutObjectResult result = s3.putObject(request);
    -                        if (result.getVersionId() != null) {
    -                            attributes.put("s3.version", result.getVersionId());
    -                        }
    +                            try {
    +                                final PutObjectResult result = s3.putObject(request);
    +                                if (result.getVersionId() != null) {
    +                                    attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
    +                                }
    +                                if (result.getETag() != null) {
    +                                    attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
    +                                }
    +                                if (result.getExpirationTime() != null) {
    +                                    attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
    +                                }
    +                                if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY))
{
    +                                    attributes.put(S3_STORAGECLASS_ATTR_KEY,
    +                                            result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
    +                                }
    +                                if (userMetadata.size() > 0) {
    +                                    StringBuilder userMetaBldr = new StringBuilder();
    +                                    for (String userKey : userMetadata.keySet()) {
    +                                        userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
    +                                    }
    +                                    attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
    +                                }
    +                            } catch (AmazonClientException e) {
    +                                getLogger().info("Failure completing upload flowfile={}
bucket={} key={} reason={}",
    +                                        new Object[]{ffFilename, bucket, key, e.getMessage()});
    +                                throw (e);
    +                            }
    +                        } else {
    +                            //----------------------------------------
    +                            // multippart upload
    +                            //----------------------------------------
     
    -                        attributes.put("s3.etag", result.getETag());
    +                            // load or create persistent state
    +                            //------------------------------------------------------------
    +                            MultipartState currentState;
    +                            try {
    +                                currentState = getState(cacheKey);
    +                                if (currentState != null) {
    +                                    if (currentState.getPartETags().size() > 0) {
    +                                        final PartETag lastETag = currentState.getPartETags().get(
    +                                                currentState.getPartETags().size() -
1);
    +                                        getLogger().info("Resuming upload for flowfile='{}'
bucket='{}' key='{}' " +
    +                                                "uploadID='{}' filePosition='{}' partSize='{}'
storageClass='{}' " +
    +                                                "contentLength='{}' partsLoaded={} lastPart={}/{}",
    +                                                new Object[]{ffFilename, bucket, key,
currentState.getUploadId(),
    +                                                        currentState.getFilePosition(),
currentState.getPartSize(),
    +                                                        currentState.getStorageClass().toString(),
    +                                                        currentState.getContentLength(),
    +                                                        currentState.getPartETags().size(),
    +                                                        Integer.toString(lastETag.getPartNumber()),
    +                                                        lastETag.getETag()});
    +                                    } else {
    +                                        getLogger().info("Resuming upload for flowfile='{}'
bucket='{}' key='{}' " +
    +                                                "uploadID='{}' filePosition='{}' partSize='{}'
storageClass='{}' " +
    +                                                "contentLength='{}' no partsLoaded",
    +                                                new Object[]{ffFilename, bucket, key,
currentState.getUploadId(),
    +                                                        currentState.getFilePosition(),
currentState.getPartSize(),
    +                                                        currentState.getStorageClass().toString(),
    +                                                        currentState.getContentLength()});
    +                                    }
    +                                } else {
    +                                    currentState = new MultipartState();
    +                                    currentState.setPartSize(multipartPartSize);
    +                                    currentState.setStorageClass(
    +                                            StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                                    currentState.setContentLength(ff.getSize());
    +                                    persistState(cacheKey, currentState);
    +                                    getLogger().info("Starting new upload for flowfile='{}'
bucket='{}' key='{}'",
    +                                            new Object[]{ffFilename, bucket, key});
    +                                }
    +                            } catch (IOException e) {
    +                                getLogger().error("IOException initiating cache state
while processing flow files: " +
    +                                        e.getMessage());
    +                                throw (e);
    +                            }
    +
    +                            // initiate multipart upload or find position in file
    +                            //------------------------------------------------------------
    +                            if (currentState.getUploadId().isEmpty()) {
    +                                final InitiateMultipartUploadRequest initiateRequest
=
    +                                        new InitiateMultipartUploadRequest(bucket, key,
objectMetadata);
    +                                initiateRequest.setStorageClass(currentState.getStorageClass());
    +                                final AccessControlList acl = createACL(context, ff);
    +                                if (acl != null) {
    +                                    initiateRequest.setAccessControlList(acl);
    +                                }
    +                                try {
    +                                    final InitiateMultipartUploadResult initiateResult
=
    +                                            s3.initiateMultipartUpload(initiateRequest);
    +                                    currentState.setUploadId(initiateResult.getUploadId());
    +                                    currentState.getPartETags().clear();
    +                                    try {
    +                                        persistState(cacheKey, currentState);
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Exception saving cache state
while processing flow file: " +
    +                                                e.getMessage());
    +                                        throw(new ProcessException("Exception saving
cache state", e));
    +                                    }
    +                                    getLogger().info("Success initiating upload flowfile={}
available={} position={} " +
    +                                            "length={} bucket={} key={} uploadId={}",
    +                                            new Object[]{ffFilename, in.available(),
currentState.getFilePosition(),
    +                                                    currentState.getContentLength(),
bucket, key,
    +                                                    currentState.getUploadId()});
    +                                    if (initiateResult.getUploadId() != null) {
    +                                        attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
    +                                    }
    +                                } catch (AmazonClientException e) {
    +                                    getLogger().info("Failure initiating upload flowfile={}
bucket={} key={} reason={}",
    +                                            new Object[]{ffFilename, bucket, key, e.getMessage()});
    +                                    throw(e);
    +                                }
    +                            } else {
    +                                if (currentState.getFilePosition() > 0) {
    +                                    try {
    +                                        final long skipped = in.skip(currentState.getFilePosition());
    +                                        if (skipped != currentState.getFilePosition())
{
    +                                            getLogger().info("Failure skipping to resume
upload flowfile={} " +
    +                                                    "bucket={} key={} position={} skipped={}",
    +                                                    new Object[]{ffFilename, bucket,
key,
    +                                                            currentState.getFilePosition(),
skipped});
    +                                        }
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Failure skipping to resume
upload flowfile={} bucket={} " +
    +                                                "key={} position={} reason={}",
    +                                                new Object[]{ffFilename, bucket, key,
currentState.getFilePosition(),
    +                                                        e.getMessage()});
    +                                        throw(new ProcessException(e));
    +                                    }
    +                                }
    +                            }
    +
    +                            // upload parts
    +                            //------------------------------------------------------------
    +                            long thisPartSize;
    +                            for (int part = currentState.getPartETags().size() + 1;
    +                                 currentState.getFilePosition() < currentState.getContentLength();
part++) {
    +                                if (!PutS3Object.this.isScheduled()) {
    +                                    getLogger().info("Processor unscheduled, stopping
upload flowfile={} part={} " +
    +                                            "uploadId={}", new Object[]{ffFilename, part,
currentState.getUploadId()});
    +                                    session.rollback();
    +                                    return;
    +                                }
    +                                thisPartSize = Math.min(currentState.getPartSize(),
    +                                        (currentState.getContentLength() - currentState.getFilePosition()));
    +                                UploadPartRequest uploadRequest = new UploadPartRequest()
    +                                        .withBucketName(bucket)
    +                                        .withKey(key)
    +                                        .withUploadId(currentState.getUploadId())
    +                                        .withInputStream(in)
    +                                        .withPartNumber(part)
    +                                        .withPartSize(thisPartSize);
    +                                try {
    +                                    UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
    +                                    currentState.addPartETag(uploadPartResult.getPartETag());
    +                                    currentState.setFilePosition(currentState.getFilePosition()
+ thisPartSize);
    +                                    try {
    +                                        persistState(cacheKey, currentState);
    +                                    } catch (Exception e) {
    --- End diff --
    
    I'd add a comment here on why swallowing exceptions here is okay, as it is not immediately
intuitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message