storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject storm git commit: Sync storm-kafka.md with external/storm-kafka/README.md
Date Thu, 09 Mar 2017 04:48:39 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7874662f1 -> d97b2705c


Sync storm-kafka.md with external/storm-kafka/README.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d97b2705
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d97b2705
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d97b2705

Branch: refs/heads/1.x-branch
Commit: d97b2705c3f00ecef313e7ebb18cd6fb2c23d4e6
Parents: 7874662
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Mar 9 13:48:28 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Mar 9 13:48:28 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka.md | 174 +++++++++++++++++++++++++++++++++++------------
 1 file changed, 131 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d97b2705/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
index 38d7cdc..37ab05a 100644
--- a/docs/storm-kafka.md
+++ b/docs/storm-kafka.md
@@ -9,16 +9,16 @@ Provides core Storm and Trident spout implementations for consuming data
from Ap
 ##Spouts
 We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost
interface that
 tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related
parameters.
- 
+
 ###BrokerHosts
-In order to initialize your Kafka spout/emitter you need to construct an instance of the
marker interface BrokerHosts. 
+In order to initialize your Kafka spout/emitter you need to construct an instance of the
marker interface BrokerHosts.
 Currently, we support the following two implementations:
 
 ####ZkHosts
-ZkHosts is what you should use if you want to dynamically track Kafka broker to partition
mapping. This class uses 
+ZkHosts is what you should use if you want to dynamically track Kafka broker to partition
mapping. This class uses
 Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate
an object by calling
 ```java
-    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr, String brokerZkPath)
     public ZkHosts(String brokerZkStr)
 ```
 Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory
under which all the topics and
@@ -43,7 +43,7 @@ of this class, you need to first construct an instance of GlobalPartitionInforma
 ```
 
 ###KafkaConfig
-The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. 
+The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.
 ```java
     public KafkaConfig(BrokerHosts hosts, String topic)
     public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
@@ -66,16 +66,20 @@ In addition to these parameters, SpoutConfig contains the following fields
that
     // setting for how often to save the current Kafka offset to ZooKeeper
     public long stateUpdateIntervalMs = 2000;
 
-    // Exponential back-off retry settings.  These are used when retrying messages after
a bolt
-    // calls OutputCollector.fail().
-    // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to
prevent
-    // resubmitting the message while still retrying.
+    // Retry strategy for failed messages
+    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager
for retrying messages after a bolt
+    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager
is being used.
+    // Initial delay between successive retries
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
+    
+    // Maximum delay between successive retries    
     public long retryDelayMaxMs = 60 * 1000;
+    // Failed message will be retried infinitely if retryLimit is less than zero. 
+    public int retryLimit = -1;     
 
-    // if set to true, spout will set Kafka topic as the emitted Stream ID
-    public boolean topicAsStreamId = false;
 ```
 Core KafkaSpout only accepts an instance of SpoutConfig.
 
@@ -98,18 +102,74 @@ The KafkaConfig class also has bunch of public variables that controls
your appl
 
 Most of them are self explanatory except MultiScheme.
 ###MultiScheme
-MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed
into a storm tuple. It
+MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed
into a storm tuple. It
 also controls the naming of your output field.
 
 ```java
-  public Iterable<List<Object>> deserialize(byte[] ser);
+  public Iterable<List<Object>> deserialize(ByteBuffer ser);
   public Fields getOutputFields();
 ```
 
-The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as
is. The name of the
-outputField is "bytes".  There are alternative implementation like `SchemeAsMultiScheme`
and
-`KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+The default `RawMultiScheme` just takes the `ByteBuffer` and returns a tuple with the ByteBuffer
converted to a `byte[]`. The name of the outputField is "bytes". There are alternative implementations
like `SchemeAsMultiScheme` and `KeyValueSchemeAsMultiScheme` which can convert the `ByteBuffer`
to `String`.
+
+There is also an extension of `SchemeAsMultiScheme`, `MessageMetadataSchemeAsMultiScheme`,
+which has an additional deserialize method that accepts the message `ByteBuffer` in addition
to the `Partition` and `offset` associated with the message.
+
+```java
+public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
Partition partition, long offset)
+
+```
+
+This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving
the partition and offset of each message of a discrete stream instead of persisting the entire
message.
+
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message.
Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential
delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass
to the full classname
+of implementation. Here is the interface 
+
+```java
+    // Spout initialization can go here. This can be called multiple times during lifecycle
of a worker. 
+    void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+    // Message corresponding to offset has failed. This method is called only if retryFurther
returns true for offset.
+    void failed(Long offset);
+
+    // Message corresponding to offset has been acked.  
+    void acked(Long offset);
+
+    // Message corresponding to the offset, has been re-emitted and under transit.
+    void retryStarted(Long offset);
+
+    /**
+     * The offset of message, which is to be re-emitted. Spout will fetch messages starting
from this offset
+     * and resend them, except completed messages.
+     */
+    Long nextFailedMessageToRetry();
+
+    /**
+     * @return True if the message corresponding to the offset should be emitted NOW. False
otherwise.
+     */
+    boolean shouldReEmitMsg(Long offset);
 
+    /**
+     * Spout will clean up the state for this offset if false is returned. If retryFurther
is set to true,
+     * spout will called failed(offset) in next call and acked(offset) otherwise 
+     */
+    boolean retryFurther(Long offset);
+
+    /**
+     * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+     */
+    Set<Long> clearOffsetsBefore(Long kafkaOffset);
+``` 
+
+#### Version incompatibility
+In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`.
The `MultScheme` and the related
+Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
+
+This means that pre 1.0 kafka spouts will not work with Storm versions 1.0 and higher. While
running topologies in Storm version 1.0
+and higher, it must be ensured that the storm-kafka version is at least 1.0. Pre 1.0 shaded
topology jars that bundles
+storm-kafka classes must be rebuilt with storm-kafka version 1.0 for running in clusters
with storm 1.0 and higher.
 
 ### Examples
 
@@ -158,10 +218,10 @@ set the parameter `KafkaConfig.ignoreZkOffsets` to `true`.  If `true`,
the spout
 offset defined by `KafkaConfig.startOffsetTime` as described above.
 
 
-## Using storm-kafka with different versions of Scala
+## Using storm-kafka with different versions of Kafka
 
 Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not
be pulled in
-as a transitive dependency. This allows you to use a version of Kafka built against a specific
Scala version.
+as a transitive dependency. This allows you to use a version of Kafka dependency-compatible
with your Kafka cluster.
 
 When building a project with storm-kafka, you must explicitly add the Kafka dependency. For
example, to
 use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your
`pom.xml`:
@@ -186,8 +246,18 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use the following
dependen
 
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts
with Storm's dependencies.
 
+You can also override the kafka dependency version while building from maven, with parameter
`storm.kafka.version` and `storm.kafka.artifact.id`
+e.g. `mvn clean install -Dstorm.kafka.artifact.id=kafka_2.11 -Dstorm.kafka.version=0.9.0.1`
+
+When selecting a kafka dependency version, you should ensure -
+ 1. kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API
is supported by storm-kafka
+ module. If you want to use a higher version, storm-kafka-client module should be used instead.
+ 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x
client will not work with
+ 0.8.x broker.
+
+
 ##Writing to Kafka as part of your topology
-You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component
to your topology or if you 
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component
to your topology or if you
 are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory
and
 org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
@@ -202,9 +272,9 @@ These interfaces have 2 methods defined:
 ```
 
 As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message.
If you just want one field
-as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java

-implementation. In the KafkaBolt, the implementation always looks for a field with field
name "key" and "message" if you 
-use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility

+as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field with field
name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
 reasons. Alternatively you could also specify a different key and message field by using
the non default constructor.
 In the TridentKafkaState you must specify what is the field name for key and message as there
is no default constructor.
 These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
@@ -216,21 +286,35 @@ public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
 }
 ```
-The implementation of this interface should return the topic to which the tuple's key/message
mapping needs to be published 
-You can return a null and the message will be ignored. If you have one static topic name
then you can use 
+The implementation of this interface should return the topic to which the tuple's key/message
mapping needs to be published
+You can return a null and the message will be ignored. If you have one static topic name
then you can use
 DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic
should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will use this value as
topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into default topic .
+Please make sure the default topic have created .
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs

-section "Important configuration properties for the producer", in your Storm topology config
by setting the properties
-map with key kafka.broker.properties.
+You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()`
and `TridentKafkaStateFactory.withProducerProperties()`. Please see  http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+     Config config = new Config();
+     config.put("kafka.topic.wildcard.match",true);
+
+```
+
+After this you can specify a wildcard topic for matching e.g. clickstream.*.log.  This will
match all streams matching clickstream.my.log, clickstream.cart.log etc
+
 
 ###Putting it all together
 
 For the bolt :
 ```java
         TopologyBuilder builder = new TopologyBuilder();
-    
+
         Fields fields = new Fields("key", "message");
         FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                     new Values("storm", "1"),
@@ -240,19 +324,21 @@ For the bolt :
         );
         spout.setCycle(true);
         builder.setSpout("spout", spout, 5);
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
         KafkaBolt bolt = new KafkaBolt()
+                .withProducerProperties(props)
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
         builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-        
+
         Config conf = new Config();
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        
+
         StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
@@ -271,17 +357,19 @@ For Trident:
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
         TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withProducerProperties(props)
                 .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word",
"count"));
         stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
 
         Config conf = new Config();
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
         StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
-```
+```
\ No newline at end of file


Mime
View raw message