nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jskora <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1107 - Create new PutS3ObjectMultipart pro...
Date Mon, 16 Nov 2015 19:11:01 GMT
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/121#discussion_r44967382
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3ObjectMultipart.java
---
    @@ -0,0 +1,550 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.AmazonClientException;
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.AmazonS3Client;
    +import com.amazonaws.services.s3.model.AccessControlList;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
    +import com.amazonaws.services.s3.model.ObjectMetadata;
    +import com.amazonaws.services.s3.model.PartETag;
    +import com.amazonaws.services.s3.model.StorageClass;
    +import com.amazonaws.services.s3.model.UploadPartRequest;
    +import com.amazonaws.services.s3.model.UploadPartResult;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"Amazon", "S3", "AWS", "Archive", "Put", "Multi", "Multipart", "Upload"})
    +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket using the MultipartUpload
API method.  " +
    +        "This upload consists of three steps 1) initiate upload, 2) upload the parts,
and 3) complete the upload.\n" +
    +        "Since the intent for this processor involves large files, the processor saves
state locally after each step " +
    +        "so that an upload can be resumed without having to restart from the beginning
of the file.\n" +
    +        "The AWS libraries default to using standard AWS regions but the 'Endpoint Override
URL' allows this to be " +
    +        "overridden.")
    +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
    +        value = "The value of a User-Defined Metadata field to add to the S3 Object",
    +        description = "Allows user-defined metadata to be added to the S3 object as key/value
pairs",
    +        supportsExpressionLanguage = true)
    +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as
the filename for the S3 object")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where
the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.key", description = "The S3 key within where
the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.version", description = "The version of the
S3 Object that was put to S3"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
    +        @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used
to upload the Object to S3"),
    +        @WritesAttribute(attribute = "s3.expiration", description = "A human-readable
form of the expiration date of " +
    +                "the S3 object, if one is set"),
    +        @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable
form of the User Metadata " +
    +                "of the S3 object, if any was set")
    +})
    +public class PutS3ObjectMultipart extends AbstractS3Processor {
    +
    +    public static final long MIN_BYTES_INCLUSIVE = 50L * 1024L * 1024L;
    +    public static final long MAX_BYTES_INCLUSIVE = 5L * 1024L * 1024L * 1024L;
    +    public static final String PERSISTENCE_ROOT = "conf/state/";
    +
    +    public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
    +            .name("Expiration Time Rule")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
    +            .name("Storage Class")
    +            .required(true)
    +            .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
    +            .defaultValue(StorageClass.Standard.name())
    +            .build();
    +
    +    public static final PropertyDescriptor PART_SIZE = new PropertyDescriptor.Builder()
    +            .name("Part Size")
    +            .description("Specifies the Part Size to be used for the S3 Multipart Upload
API.  The flow file will be " +
    +                    "broken into Part Size chunks during upload.  Part size must be at
least 50MB and no more than " +
    +                    "5GB, but the final part can be less than 50MB.")
    +            .required(true)
    +            .defaultValue("5 GB")
    +            .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_BYTES_INCLUSIVE,
MAX_BYTES_INCLUSIVE))
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KEY, BUCKET, PART_SIZE, ENDPOINT_OVERRIDE, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE,
    +                    SSL_CONTEXT_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
    +                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST,
WRITE_ACL_LIST, OWNER));
    +
    +    final static String S3_BUCKET_KEY = "s3.bucket";
    +    final static String S3_OBJECT_KEY = "s3.key";
    +    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
    +    final static String S3_VERSION_ATTR_KEY = "s3.version";
    +    final static String S3_ETAG_ATTR_KEY = "s3.etag";
    +    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
    +    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    protected File getPersistenceFile() {
    +        return new File(PERSISTENCE_ROOT + getIdentifier());
    +    }
    +
    +    @Override
    +    public void onPropertyModified(final PropertyDescriptor descriptor, final String
oldValue, final String newValue) {
    +        if ( descriptor.equals(KEY)
    +                || descriptor.equals(BUCKET)
    +                || descriptor.equals(ENDPOINT_OVERRIDE)
    +                || descriptor.equals(STORAGE_CLASS)
    +                || descriptor.equals(REGION)) {
    +            destroyState();
    +        }
    +    }
    +
    +    protected MultipartState getState(final String s3ObjectKey) throws IOException {
    +        // get local state if it exists
    +        MultipartState currState = null;
    +        final File persistenceFile = getPersistenceFile();
    +        if (persistenceFile.exists()) {
    +            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
    +                final Properties props = new Properties();
    +                props.load(fis);
    +                if (props.containsKey(s3ObjectKey)) {
    +                    final String localSerialState = props.getProperty(s3ObjectKey);
    +                    if (localSerialState != null) {
    +                        currState = new MultipartState(localSerialState);
    +                        getLogger().info("Local state for {} loaded with uploadId {}
and {} partETags",
    +                                new Object[]{s3ObjectKey, currState.uploadId, currState.partETags.size()});
    +                    }
    +                }
    +            } catch (IOException ioe) {
    +                getLogger().warn("Failed to recover local state for {} due to {}. Assuming
no local state and " +
    +                                "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
    +            }
    +        }
    +        return currState;
    +    }
    +
    +    protected void persistState(final String s3ObjectKey, final MultipartState currState)
throws IOException {
    +        final String currStateStr = (currState == null) ? null : currState.toString();
    +        final File persistenceFile = getPersistenceFile();
    +        final File parentDir = persistenceFile.getParentFile();
    +        if (!parentDir.exists() && !parentDir.mkdirs()) {
    +            throw new IOException("Could not create persistence directory " + parentDir.getAbsolutePath()
+
    --- End diff --
    
    Agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message