kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown
Date Sat, 01 Apr 2017 02:28:32 GMT
Should this timeout be less than max poll interval value? if yes than
generally speaking what should be the ratio between two or range for this
timeout value .

Thanks
Sachin

On 1 Apr 2017 04:57, "Matthias J. Sax" <matthias@confluent.io> wrote:

Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG


-Matthias


On 3/31/17 11:32 AM, Sachin Mittal wrote:
> Hi,
> So I have added the config ProducerConfig.RETRIES_CONFIG,
Integer.MAX_VALUE
> and the NotLeaderForPartitionException is gone.
>
> However we see a new exception especially under heavy load:
> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
> caught when producing
>   at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
checkForException(RecordCollectorImpl.java:119)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
RecordCollectorImpl.java:127)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
measureLatencyNs(StreamsMetricsImpl.java:188)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
java:280)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
StreamThread.java:787)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
StreamThread.java:774)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
StreamThread.java:749)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
StreamThread.java:671)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
append
>
> So any idea as why TimeoutException is happening.
> Is this controlled by
> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>
> If yes
> What should the value be set in this given that out consumer
> max.poll.interval.ms is defaul 5 minutes.
>
> Is there any other setting that we should try to avoid such errors which
> causes stream thread to die.
>
> Thanks
> Sachin
>
>
> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
>
>> Hi Sachin,
>>
>> Not in this case.
>>
>> Thanks
>> Eno
>>
>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmittal@gmail.com> wrote:
>>>
>>> OK.
>>> I will try this out.
>>>
>>> Do I need to change anything for
>>> max.in.flight.requests.per.connection
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.thereska@gmail.com>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> For this particular error, “org.apache.kafka.common.errors.
>>>> NotLeaderForPartitionException: This server is not the leader for that
>>>> topic-partition.”, could you try setting the number of retries to
>> something
>>>> large like this:
>>>>
>>>> Properties props = new Properties();
>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>> ...
>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>
>>>> This will retry the produce requests and should hopefully solve your
>>>> immediate problem.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>
>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sjmittal@gmail.com> wrote:
>>>>
>>>>    Hi,
>>>>    We have encountered another case of series of errors which I would
>> need
>>>>    more help in understanding.
>>>>
>>>>    In logs we see message like this:
>>>>    ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>    85-StreamThread-3-producer]:
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>>>> task
>>>>    [0_1] Error sending record to topic new-part-advice-key-table-
>> changelog.
>>>> No
>>>>    more offsets will be recorded for this task and the exception will
>>>>    eventually be thrown
>>>>
>>>>    then some millisecond later
>>>>    ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>    org.apache.kafka.streams.processor.internals.StreamThread -
>>>> stream-thread
>>>>    [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>>>> state:
>>>>    org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> exception
>>>>    caught when producing
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>> checkForException(RecordCollectorImpl.java:119)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamTask.flushState(
>>>> StreamTask.java:422)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>>>> StreamThread.java:555)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.
>>>> performOnAllTasks(StreamThread.java:513)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.
>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>>>> StreamThread.java:408)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:389)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>    org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>>>> server
>>>>    is not the leader for that topic-partition.
>>>>
>>>>    finally we get this
>>>>    ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>> com.advice.TestKafkaAdvice
>>>>    - Uncaught exception:
>>>>    org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>    process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>    topic=advice-stream, partition=1, offset=48062286
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamTask.process(StreamTask.java:216)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>> StreamThread.java:651)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:378)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>    Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>> [0_1]
>>>>    exception caught when producing
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>> checkForException(RecordCollectorImpl.java:119)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>> RecordCollectorImpl.java:76)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>> RecordCollectorImpl.java:64)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>
>>>>
>>>>    Again it is not clear why in this case we need to shut down the
>> steams
>>>>    thread and eventually the application. Shouldn't we capture this
>> error
>>>> too?
>>>>
>>>>    Thanks
>>>>    Sachin
>>>>
>>>>
>>>>
>>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message