nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JPercivall <...@git.apache.org>
Subject [GitHub] nifi pull request #349: NIFI-1767 AWS IoT processors
Date Wed, 01 Jun 2016 19:07:39 GMT
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65423265
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java
---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.services.iotdata.AWSIotDataClient;
    +import com.amazonaws.services.iotdata.model.GetThingShadowRequest;
    +import com.amazonaws.services.iotdata.model.GetThingShadowResult;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +
    +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Get"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("Gets last persisted state of a thing in AWS IoT by reading out
the shadow. " +
    +        "A shadow might change more often than you get triggered. In order to get every
message send " +
    +        "out by a thing you better use GetAWSIoT processor. You can dynamically set a
thing-name " +
    +        "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".")
    +@SeeAlso({ GetAWSIoT.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides
the processor configuration for topic."),
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.thing", description = "Thing name in AWS
IoT"),
    +})
    +public class GetAWSIoTShadow extends AbstractAWSIoTShadowProcessor {
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_THING,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    PROXY_HOST,
    +                    PROXY_HOST_PORT,
    +                    REGION));
    +
    +    private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
    +        final AWSIotDataClient iotClient = this.getClient();
    +
    +        if (iotClient == null) {
    +            getLogger().error("AWS-Client was not initialized. See logs to find reasons.");
    +            return;
    +        }
    +        // get flowfile
    +        FlowFile flowFile = session.get();
    +        // if provided override configured thing name with name from corresponding message
attribute
    +        String thingName = flowFile != null && flowFile.getAttributes().containsKey(ATTR_NAME_THING)
    +                        ? flowFile.getAttribute(ATTR_NAME_THING)
    +                        : context.getProperty(PROP_NAME_THING).getValue();
    +
    +        // ask shadow of the thing for last reported state by requesting the API of AWS
    +        final GetThingShadowRequest iotRequest = new GetThingShadowRequest().withThingName(thingName);
    +        final GetThingShadowResult iotResponse = iotClient.getThingShadow(iotRequest);
    +
    +        FlowFile flowFileOut = session.create();
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put(PROP_NAME_THING, thingName);
    +        flowFileOut = session.putAllAttributes(flowFileOut, attributes);
    +
    +        flowFileOut = session.write(flowFileOut, new OutputStreamCallback() {
    +            @Override
    +            public void process(final OutputStream out) throws IOException {
    +                out.write(iotResponse.getPayload().array());
    +            }
    +        });
    +        session.transfer(flowFileOut, REL_SUCCESS);
    +        session.commit();
    --- End diff --
    
    Should emit a "FETCH" provenance event when created using an incoming flowfile and "RECEIVE"
when the processor is acting as a source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message