storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep s <sreekumar.prad...@gmail.com>
Subject Re: Storm kafka spout offset going back
Date Fri, 01 Sep 2017 22:11:57 GMT
Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
not needed.

Problem 2 was for a separate topic . Problem 1 was observed for topics
where processing failures are encountered previously.

I have attached the error processing and bolt files

Thanks
Pradeep




On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <srdo@apache.org> wrote:

> Just to make sure I understand:
>
> This is your topology
> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>
> The bolt you posted the execute method for is the DataBaseInsertBolt,
> right?
> What are these statements for if this is the last bolt in the topology? "
> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
> Are the topics you mention in problem 1 and 2 the same topic? Essentially
> what I'm asking is whether the topic that is stuck is also the one with
> failures that is starting over on an old offset?
> Can you post your RetryService configuration?
> You talked about moving tuples to an error queue if they fail
> deserialization in the Avro bolt. Can you post that execute too?
>
> 2017-09-01 20:14 GMT+02:00 pradeep s <sreekumar.pradeep@gmail.com>:
>
>> Thanks  Stig for the response . I can give some more detail on the issue
>> we are facing now .
>> For any database failure ,we are retrying the tuple for upto 10 times .
>> Database failure is mostly because of parent child relation ,since we are
>> processing out of order .
>> Our consumer group has more than 10 topics and  each topic corresponds to
>> one table . For eg: we have topics A, B and C in a group its corresponding
>> to tables A,B and C in database .
>> In this , table A will the parent and table B and table C will be child
>> tables .
>> Spout parallelism is set as 50 and each topic has 50 partitions .These 50
>> threads are going round robing across all the topics in the group.
>>
>> Issues observed with the current setup are
>>
>> 1)One partition for one topic alone getting stuck .All the other
>> partition lag is cleared
>>
>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>
>>
>> DB Bolt Execute Method below
>> =======================
>> exceptionCount will have a value greater than 0 once the message is moved
>> to error queue . In that case i am acknowleding the message . Other cases i
>> am calling tuple.fail.
>> There is no downstream bolt for this . This is the final bolt in the
>> topology.
>>
>>  @Override
>>
>>     public void execute(final Tuple tuple) {
>>
>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>> D_MESSAGE);
>>
>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>> GG_MESSAGE);
>>
>>         try {
>>
>>             // Call to handler for generating Sql
>>
>>             Date date = new Date();
>>
>>             super.getMessageHandler().handleMessage(ggMessage, super
>> .getGenericMessageDAO());
>>
>>             super.getOutputCollector().emit(tuple, new Values(fullMessage
>> ));
>>
>>             super.getOutputCollector().ack(tuple);
>>
>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>> Date().getTime() - date.getTime());
>>
>>         } catch (Exception e) {
>>
>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e);
>>
>>             int exceptionCount = handleException(fullMessage, ggMessage,
>> e, isNormalProcessing);
>>
>>             if (exceptionCount != -1) {
>>
>>                 // If message write is success acknowledge the message so
>>
>>                 // that it will be removed from kafka queue
>>
>>                 super.getOutputCollector().emit(tuple, new Values(
>> fullMessage));
>>
>>                 super.getOutputCollector().ack(tuple);
>>
>>             } else {
>>
>>                 super.getOutputCollector().reportError(e);
>>
>>                 super.getOutputCollector().fail(tuple);
>>
>>             }
>>
>>         }
>>
>>     }
>>
>>
>>
>>
>>
>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <srdo@apache.org>
>> wrote:
>>
>>> Hi Pradeep,
>>>
>>> When you move the message to an error queue, is this happening from
>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>> cause the tuple to fail)?
>>>
>>> Storm will ack messages on the spout as long as all edges in the tuple
>>> tree are acked, and the topology message timeout hasn't expired before this
>>> occurs.
>>>
>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
>>> the only bolt consuming from the spout, the bolt will receive t0 and must
>>> ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
>>> to t0 (using any of the https://storm.apache.org/relea
>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>>> and they all get acked before the message timeout elapses.
>>>
>>> Depending on your throughput this may be infeasible, but you might try
>>> enabling debug logging https://storm.apache.org/relea
>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean- which
>>> will let you tell whether the tuple is being acked on the spout.
>>>
>>> If the tuple is being acked on the spout, you might want to look at some
>>> of the logs from this method https://github.com/apache/stor
>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should
>>> show you what the spout is doing internally. Keep in mind that the spout
>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>> when you restart, that could also cause this.
>>>
>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>> sreekumar.pradeep@gmail.com>:
>>>
>>>> Hi,
>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>> server is 0.10.1.1.
>>>>
>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>
>>>> Message flow is like below and its a normal topology
>>>>
>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>
>>>> If the message fails avro deserialization , i am moving the message to
>>>> a error queue and acknowledging from the avro bolt . This message is not
>>>> emitted to database bolt .
>>>>
>>>> But its observed that after i restart topology , offset for the topic
>>>> is going back to old offset.
>>>>
>>>> Will Kafka commit the offset, only if the message is acked from all
>>>> bolts ?
>>>>
>>>> Is the offset going back to previous value is beacuse of this ..
>>>>
>>>> Thanks
>>>> Pradeep
>>>>
>>>
>>>
>>
>

Mime
View raw message