storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep s <sreekumar.prad...@gmail.com>
Subject Re: Storm kafka spout offset going back
Date Sat, 02 Sep 2017 16:58:47 GMT
Hi Stig,
Max uncommitted offsets is set at 10_000 and retry threshold for exception
handling is 10.
Thanks
Pradeep



On Sat, Sep 2, 2017 at 4:46 AM, Stig Rohde Døssing <srdo@apache.org> wrote:

> Thanks. The bolts look fine to me. I'd look at whether the tuples are
> being acked on the spout (use the debug setting on Config), and the
> OffsetManager class logs I linked earlier. I don't know if it's relevant to
> your case, but please note that there are some cases where setting a low
> maxUncommittedOffsets can cause the spout to stop polling for tuples. It's
> being fixed, but please leave maxUncommittedOffsets at the default if
> you're setting it to a custom value.
>
> What is your retry service configuration?
>
> 2017-09-02 0:11 GMT+02:00 pradeep s <sreekumar.pradeep@gmail.com>:
>
>> Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
>> not needed.
>>
>> Problem 2 was for a separate topic . Problem 1 was observed for topics
>> where processing failures are encountered previously.
>>
>> I have attached the error processing and bolt files
>>
>> Thanks
>> Pradeep
>>
>>
>>
>>
>> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <srdo@apache.org>
>> wrote:
>>
>>> Just to make sure I understand:
>>>
>>> This is your topology
>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>>>
>>> The bolt you posted the execute method for is the DataBaseInsertBolt,
>>> right?
>>> What are these statements for if this is the last bolt in the topology? "
>>> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
>>> Are the topics you mention in problem 1 and 2 the same topic?
>>> Essentially what I'm asking is whether the topic that is stuck is also the
>>> one with failures that is starting over on an old offset?
>>> Can you post your RetryService configuration?
>>> You talked about moving tuples to an error queue if they fail
>>> deserialization in the Avro bolt. Can you post that execute too?
>>>
>>> 2017-09-01 20:14 GMT+02:00 pradeep s <sreekumar.pradeep@gmail.com>:
>>>
>>>> Thanks  Stig for the response . I can give some more detail on the
>>>> issue we are facing now .
>>>> For any database failure ,we are retrying the tuple for upto 10 times .
>>>> Database failure is mostly because of parent child relation ,since we are
>>>> processing out of order .
>>>> Our consumer group has more than 10 topics and  each topic corresponds
>>>> to one table . For eg: we have topics A, B and C in a group its
>>>> corresponding to tables A,B and C in database .
>>>> In this , table A will the parent and table B and table C will be child
>>>> tables .
>>>> Spout parallelism is set as 50 and each topic has 50 partitions .These
>>>> 50 threads are going round robing across all the topics in the group.
>>>>
>>>> Issues observed with the current setup are
>>>>
>>>> 1)One partition for one topic alone getting stuck .All the other
>>>> partition lag is cleared
>>>>
>>>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>>>
>>>>
>>>> DB Bolt Execute Method below
>>>> =======================
>>>> exceptionCount will have a value greater than 0 once the message is
>>>> moved to error queue . In that case i am acknowleding the message . Other
>>>> cases i am calling tuple.fail.
>>>> There is no downstream bolt for this . This is the final bolt in the
>>>> topology.
>>>>
>>>>  @Override
>>>>
>>>>     public void execute(final Tuple tuple) {
>>>>
>>>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>>>> D_MESSAGE);
>>>>
>>>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>>>> GG_MESSAGE);
>>>>
>>>>         try {
>>>>
>>>>             // Call to handler for generating Sql
>>>>
>>>>             Date date = new Date();
>>>>
>>>>             super.getMessageHandler().handleMessage(ggMessage, super
>>>> .getGenericMessageDAO());
>>>>
>>>>             super.getOutputCollector().emit(tuple, new Values(
>>>> fullMessage));
>>>>
>>>>             super.getOutputCollector().ack(tuple);
>>>>
>>>>             LOGGER.info("DbActionBolt Ack time in ms: {}", new
>>>> Date().getTime() - date.getTime());
>>>>
>>>>         } catch (Exception e) {
>>>>
>>>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e
>>>> );
>>>>
>>>>             int exceptionCount = handleException(fullMessage, ggMessage,
>>>> e, isNormalProcessing);
>>>>
>>>>             if (exceptionCount != -1) {
>>>>
>>>>                 // If message write is success acknowledge the message
>>>> so
>>>>
>>>>                 // that it will be removed from kafka queue
>>>>
>>>>                 super.getOutputCollector().emit(tuple, new Values(
>>>> fullMessage));
>>>>
>>>>                 super.getOutputCollector().ack(tuple);
>>>>
>>>>             } else {
>>>>
>>>>                 super.getOutputCollector().reportError(e);
>>>>
>>>>                 super.getOutputCollector().fail(tuple);
>>>>
>>>>             }
>>>>
>>>>         }
>>>>
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <srdo@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Pradeep,
>>>>>
>>>>> When you move the message to an error queue, is this happening from
>>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that
the
>>>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>>>> cause the tuple to fail)?
>>>>>
>>>>> Storm will ack messages on the spout as long as all edges in the tuple
>>>>> tree are acked, and the topology message timeout hasn't expired before
this
>>>>> occurs.
>>>>>
>>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt
>>>>> is the only bolt consuming from the spout, the bolt will receive t0 and
>>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples
>>>>> anchored to t0 (using any of the https://storm.apache.org/relea
>>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>>>> that take a Tuple anchor), the downstream bolts must ack those exactly
once
>>>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once
each,
>>>>> and they all get acked before the message timeout elapses.
>>>>>
>>>>> Depending on your throughput this may be infeasible, but you might try
>>>>> enabling debug logging https://storm.apache.org/relea
>>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
>>>>> which will let you tell whether the tuple is being acked on the spout.
>>>>>
>>>>> If the tuple is being acked on the spout, you might want to look at
>>>>> some of the logs from this method https://github.com/apache/stor
>>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should
>>>>> show you what the spout is doing internally. Keep in mind that the spout
>>>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>>>> when you restart, that could also cause this.
>>>>>
>>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>>>> sreekumar.pradeep@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>>>> server is 0.10.1.1.
>>>>>>
>>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>>>
>>>>>> Message flow is like below and its a normal topology
>>>>>>
>>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>>>
>>>>>> If the message fails avro deserialization , i am moving the message
>>>>>> to a error queue and acknowledging from the avro bolt . This message
is not
>>>>>> emitted to database bolt .
>>>>>>
>>>>>> But its observed that after i restart topology , offset for the topic
>>>>>> is going back to old offset.
>>>>>>
>>>>>> Will Kafka commit the offset, only if the message is acked from all
>>>>>> bolts ?
>>>>>>
>>>>>> Is the offset going back to previous value is beacuse of this ..
>>>>>>
>>>>>> Thanks
>>>>>> Pradeep
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message