storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep s <sreekumar.prad...@gmail.com>
Subject Re: Storm Kafka offset monitoring
Date Thu, 23 Feb 2017 18:52:26 GMT
Hi Priyank,
The confusion is on whats the proper implementation of spout .

*Method 1*
*=========*
BrokerHosts hosts = new ZkHosts(zkConnString);
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" +
topicName, spoutConfigId);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

This is working fine . Only disadvanatage is that offsets are maintained in
zookeeper . As per latest kafka docs , it says offsets can be maintained in
Kafka consumerOffsets topic.

Tried with method 2 for maintaining offsets in Kafka.
*Method 2*
*=======*
 KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();

        KafkaSpout<String, String> spout = new
KafkaSpout<>(kafkaSpoutConfig);

private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {

        Map<String, Object> props = new HashMap<>();
        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,
bootstrapServers);
        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "false");


        String[] topics = new String[1];
        topics[0] = topicName;

        KafkaSpoutStreams kafkaSpoutStreams =
                new KafkaSpoutStreams.Builder(new Fields("message"), new
String[] { topicName }).build();
                new KafkaSpoutStreamsNamedTopics.Builder(new
Fields("message"), topics).build();

        KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
                new KafkaSpoutTuplesBuilder.Builder<>(new
TuplesBuilder(topicName)).build();
                new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new
TuplesBuilder(topicName)).build();

        KafkaSpoutConfig<String, String> spoutConf =
                new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams,
tuplesBuilder).build();

        return spoutConf;
    }


But this method was showing spout errors as attached in logs.

*Is it ok to maintain offsets in zookeeper *. Any disadvantages with this
approach. Please suggest.

Logs attached .

Regards
Pradeep S



On Wed, Feb 22, 2017 at 5:25 PM, Priyank Shah <pshah@hortonworks.com> wrote:

> Hi Pradeep,
>
>
>
> Both the spouts are correct and good to use. However, we should use the
> newer spout. The one that is uses broker hosts. Reason is it uses latest
> kafka consumer api. When do you get commit failed exception? Can you send
> your worker logs where the spout is running? If you don’t see anything In
> the logs try changing the log level from ui.
>
>
>
> *From: *pradeep s <sreekumar.pradeep@gmail.com>
> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
> *Date: *Tuesday, February 21, 2017 at 5:14 PM
>
> *To: *"user@storm.apache.org" <user@storm.apache.org>
> *Subject: *Re: Storm Kafka offset monitoring
>
>
>
> Hi Priyank
>
> Currently I tested the spout using zookeeper broker hosts .This is
> processing fine.But I saw another way of initialising spout using Kafka
> bootstrap severs using kafkaspoutconfig class.
>
>
>
> https://storm.apache.org/releases/1.0.2/javadocs/org/
> apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
>
>
>
>
>
> But implementing this way I was getting commit failed exception due to
> rebalance
>
> Can you point out what's the proper way for implementing Kafka spout.
>
> In storm 1.0.3 docs I have seen the way using zookeeper broker hosts
>
> Regards
>
> Pradeep S
>
> On Tue, Feb 21, 2017 at 2:35 PM Priyank Shah <pshah@hortonworks.com>
> wrote:
>
> Hi Pradeep,
>
>
>
> A release vote for RC1 of 1.1.0 is in progress. You can track that and
> once it gets released you can upgrade.
>
>
>
> Regarding upgrading your spout, you don’t need to make any code changes.
> You just need to use the latest released spout code. Usually it involves
> updating a pom file that has a dependency on storm-kafka module to latest
> version.
>
>
>
> *From: *pradeep s <sreekumar.pradeep@gmail.com>
> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
> *Date: *Tuesday, February 21, 2017 at 12:49 PM
> *To: *"user@storm.apache.org" <user@storm.apache.org>
> *Subject: *Re: Storm Kafka offset monitoring
>
>
>
> Hi Priyank
>
> Thanks for your reply.i was not able to find 1.1.0 version of storm
>
> Can you please point to that.also can you please  confirm on what specific
> spout changes to make.
>
> Regards
>
> Pradeep S
>
> On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah <pshah@hortonworks.com>
> wrote:
>
> Hi Pradeep,
>
>
>
> If you upgrade your spout in the topology and storm code to a later
> version(I checked v1.1.0 and it has the tool) you will get a table in storm
> ui which show you offsets. If you cannot upgrade then I think you will have
> to do it manually.
>
>
>
> *From: *pradeep s <sreekumar.pradeep@gmail.com>
> *Reply-To: *"user@storm.apache.org" <user@storm.apache.org>
> *Date: *Tuesday, February 21, 2017 at 9:44 AM
> *To: *"user@storm.apache.org" <user@storm.apache.org>
> *Subject: *Storm Kafka offset monitoring
>
>
>
> Hi ,
>
> I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is
> configured using zookeeper broker hosts. Is there a monitoring ui which i
> can use to track the consumer offsets and lag.
>
> I was using yahoo kafka manager , but its showing storm spout as a
> consumer.Any help?
>
> Regards
>
> Pradeep S
>
>

Mime
View raw message