flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
Date Tue, 03 Jul 2018 19:12:00 GMT
Elias Levy created FLINK-9731:

             Summary: Kafka source subtask begins to consume from earliest offset
                 Key: FLINK-9731
                 URL: https://issues.apache.org/jira/browse/FLINK-9731
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.4.2
            Reporter: Elias Levy

On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began
consuming records from the earliest offsets available in Kafka for the partitions assigned
to it. Other subtasks did not exhibit this behavior and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints
and the job did not restore or restart. Flink logs show no indication of anything amiss. There
were no errors in the or Kafka related messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer
is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`,
and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets
are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions
for around 30 seconds as a result of losing its connection to ZooKeeper.

[2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09,
likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32]
to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)

The broker immediately reconnected to after a few tries ZK:

[2018-06-30 09:34:55,462] INFO Opening socket connection to server
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to,
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09
has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=,,
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09
has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server
[2018-06-30 09:34:55,468] INFO Socket connection established to,
initiating session (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server,
sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses:
[2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics

By 9:35:02 partitions had returned to the broker.

It appears this it the broker that the subtask was consuming from, as outgoing network traffic
from it spiked after the broker recovered leadership of its partitions, which is consistent
with the subtask starting to consuming from the earliest offset.

This may have been related to this [Kafka issue 5600](https://issues.apache.org/jira/browse/KAFKA-5600),
which affects, the version we are running, and that was fixed in But that
seems unlikely as the Flink Kafka connector consumer shouldn't make use of the offsets committed
in Kafka when operating with checkpoints enabled, nor when the job is not restarting or being

This message was sent by Atlassian JIRA

View raw message