flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Melmoth (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4618) Last kafka message gets consumed twice when restarting job
Date Wed, 14 Sep 2016 09:02:20 GMT
Melmoth created FLINK-4618:
------------------------------

             Summary: Last kafka message gets consumed twice when restarting job
                 Key: FLINK-4618
                 URL: https://issues.apache.org/jira/browse/FLINK-4618
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.1.2
         Environment: Flink 1.1.2
Kafka Broker 0.10.0
Hadoop 2.7.0
            Reporter: Melmoth


There seem to be an issue with the offset management in Flink. When a job is stopped and startet
again, a message from the previous offset is read again.
I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. Here is my log output. You can cleary
see, that the consumer waits for a new record on offset 4848911, which is correct. After restarting,
it consumes a record at 4848910, causing the record to be consumed more than once.
I started with a new consumer group an emitted one record.

I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910.

{code}
10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 2147482646 at hdp1:6667.
10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching
committed offsets for partitions: [myTopic-0]
10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 2147482646
10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No committed
offset for partition myTopic-0
10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting
offset for partition myTopic-0 to latest offset.
10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetched
offset 4848910 for partition myTopic-0
10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848910
10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848910
10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848910

-- Inserting a new event here

10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding
fetched record for partition myTopic-0 with offset 4848910 to buffered record list
10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning
fetched records at offset 4848910 for assigned partition myTopic-0 and update position to
4848911
10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 6 @ 1473841823887
10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 6 (in 96 ms)
10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending
offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646,
hdp1, 6667)
10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed
offset 4848910 for partition myTopic-0
10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped
BLOB server at 0.0.0.0:2946

-- Restarting job

10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 2147482646 at hdp1:6667.
10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching
committed offsets for partitions: [myTopic-0]
10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 2147482646
// See below! Shouldn't the offset be 4848911?
10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting
offset for partition myTopic-0 to the committed offset 4848910
10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848910
10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating
connection to node 1001 at hdp1:6667.
10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed
connection to node 1001
// Here record 4848910 gets consumed again!
10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding
fetched record for partition myTopic-0 with offset 4848910 to buffered record list
10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning
fetched records at offset 4848910 for assigned partition myTopic-0 and update position to
4848911
10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch
request for partition myTopic-0 at offset 4848911
10:32:04,894 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting
down BlobCache
10:32:04,903 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped
BLOB server at 0.0.0.0:3079
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message