metron-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cestella <...@git.apache.org>
Subject [GitHub] incubator-metron pull request #486: METRON-793: Migrate to storm-kafka-clien...
Date Thu, 23 Mar 2017 14:34:42 GMT
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/486#discussion_r107683860
  
    --- Diff: metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.metron.storm.kafka.flux;
    +
    +import com.google.common.base.Joiner;
    +import org.apache.kafka.clients.consumer.Consumer;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.metron.common.utils.KafkaUtils;
    +import org.apache.storm.kafka.spout.*;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.OutputFieldsGetter;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Function;
    +
    +/**
    + * This is a convenience layer on top of the KafkaSpoutConfig.Builder available in storm-kafka-client.
    + * The justification for this class is two-fold.  First, there are a lot of moving parts
and a simplified
    + * approach to constructing spouts is useful.  Secondly, and perhaps more importantly,
the Builder pattern
    + * is decidedly unfriendly to use inside of Flux.  Finally, we can make things a bit
more friendly by only requiring
    + * zookeeper and automatically figuring out the brokers for the bootstrap server.
    + *
    + * @param <K> The kafka key type
    + * @param <V> The kafka value type
    + */
    +public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K,
V> {
    +  final static String STREAM = "default";
    +
    +  /**
    +   * The fields exposed by the kafka consumer.  These will show up in the Storm tuple.
    +   */
    +  public enum FieldsConfiguration {
    +    KEY("key", record -> record.key()),
    +    VALUE("value", record -> record.value()),
    +    PARTITION("partition", record -> record.partition()),
    +    TOPIC("topic", record -> record.topic())
    +    ;
    +    String fieldName;
    +    Function<ConsumerRecord,Object> recordExtractor;
    +
    +    FieldsConfiguration(String fieldName, Function<ConsumerRecord,Object> recordExtractor)
{
    +      this.recordExtractor = recordExtractor;
    +      this.fieldName = fieldName;
    +    }
    +
    +    /**
    +     * Return a list of the enums
    +     * @param configs
    +     * @return
    +     */
    +    public static List<FieldsConfiguration> toList(String... configs) {
    +      List<FieldsConfiguration> ret = new ArrayList<>();
    +      for(String config : configs) {
    +        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
    +      }
    +      return ret;
    +    }
    +
    +    /**
    +     * Return a list of the enums from their string representation.
    +     * @param configs
    +     * @return
    +     */
    +    public static List<FieldsConfiguration> toList(List<String> configs)
{
    +      List<FieldsConfiguration> ret = new ArrayList<>();
    +      for(String config : configs) {
    +        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
    +      }
    +      return ret;
    +    }
    +
    +    /**
    +     * Construct a Fields object from an iterable of enums.  These fields are the fields
    +     * exposed in the Storm tuple emitted from the spout.
    +     * @param configs
    +     * @return
    +     */
    +    public static Fields getFields(Iterable<FieldsConfiguration> configs) {
    +      List<String> fields = new ArrayList<>();
    +      for(FieldsConfiguration config : configs) {
    +        fields.add(config.fieldName);
    +      }
    +      return new Fields(fields);
    +    }
    +  }
    +
    +  /**
    +   * Build a tuple given the fields and the topic.  We want to use our FieldsConfiguration
enum
    +   * to define what this tuple looks like.
    +   * @param <K> The key type in kafka
    +   * @param <V> The value type in kafka
    +   */
    +  public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V>
{
    +    private List<FieldsConfiguration> configurations;
    +    private TupleBuilder(String topic, List<FieldsConfiguration> configurations)
{
    +      super(topic);
    +      this.configurations = configurations;
    +    }
    +
    +    /**
    +     * Builds a list of tuples using the ConsumerRecord specified as parameter
    +     *
    +     * @param consumerRecord whose contents are used to build tuples
    +     * @return list of tuples
    +     */
    +    @Override
    +    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
    +      Values ret = new Values();
    +      for(FieldsConfiguration config : configurations) {
    +        ret.add(config.recordExtractor.apply(consumerRecord));
    +      }
    +      return ret;
    +    }
    +  }
    +
    +  private String topic;
    +
    +  /**
    +   * Create an object with the specified properties.  This will expose fields "key" and
"value."
    +   * @param kafkaProps The special kafka properties
    +   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex
patterns.
    +   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from
this.
    +   */
    +  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
    +                                , String topic
    +                                , String zkQuorum
    +                                )
    +  {
    +    this(kafkaProps, topic, zkQuorum, Arrays.asList("key", "value"));
    +  }
    +
    +  /**
    +   * Create an object with the specified properties and exposing the specified fields.
    +   * @param kafkaProps The special kafka properties
    +   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex
patterns.
    +   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from
this.
    +   * @param fieldsConfiguration The fields to expose in the storm tuple emitted.
    +   */
    +  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
    +                                , String topic
    +                                , String zkQuorum
    +                                , List<String> fieldsConfiguration
    +                                )
    +  {
    +    super( modifyKafkaProps(kafkaProps, zkQuorum)
    +         , createStreams(fieldsConfiguration, topic)
    +         , createTuplesBuilder(fieldsConfiguration, topic)
    +         );
    +    this.topic = topic;
    +  }
    +
    +  /**
    +   * Get the kafka topic.  TODO: In the future, support multiple topics and regex patterns.
    +   * @return
    +   */
    +  public String getTopic() {
    +    return topic;
    +  }
    +
    +  /**
    +   * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields.  Also,
configure the spout
    +   * using a Map that configures both kafka as well as the spout (see the properties
in SpoutConfiguration).
    +   * @param topic
    +   * @param zkQuorum
    +   * @param fieldsConfiguration
    +   * @param kafkaProps  The aforementioned map.
    +   * @return
    +   */
    +  public static StormKafkaSpout create( String topic
    --- End diff --
    
    yep, agreed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message