kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kindernay Oliver <oliver.kinder...@eurowag.com.INVALID>
Subject Offset commit consistently timing out in transaction
Date Mon, 04 Jan 2021 10:22:54 GMT
Hello,

we are experiencing problems with offset commit timing out on brokers. This started to happen
when we started using transactions. send_offsets_to_transactions periodically (every few minutes)
times out. We have cluster of three brokers, and topics with 15 partitions. We use one transactional
producer per partition and commit the transaction at least each second or more. The time out
seemingly happens randomly, each time for a different producer instance and different broker.


  1.  We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that error came from
a broker.
  2.  We tried to raise offsets.commit.timeout.ms on broker to 60 seconds
  3.  After the change, we are getting transaction operations timeout after 60s, with same
periodicity. This is now a client error since the kafka broker takes the full minute, after
which we would probably see the same error message from broker as previsously.
     *   <class 'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional
operation timed out"}
     *   %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]:
10.48.111.102:9092/2: Timed out TxnOffsetCommitRequest in flight (after 59943ms, timeout #0)
%4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1609747761.212|FAIL|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
1 request(s) timed out: disconnect (after 1544886ms in state UP) (_TIMED_OUT)
%3|1609747761.212|FAIL|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
1 request(s) timed out: disconnect (after 1544886ms in state UP)
%7|1609747761.212|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
Broker changed state UP -> DOWN
%7|1609747761.212|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
Broker changed state DOWN -> INIT
%7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]: 10.48.111.102:9092/2: MetadataRequest
failed: Local: Timed out: actions Retry
%7|1609747761.312|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
Broker changed state INIT -> TRY_CONNECT
%7|1609747761.312|RETRY|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2:
Moved 1 retry buffer(s) to output queue



  1.  Broker system metrics (disk, network, CPU, memory) did not indicate any bottleneck,
Anyway, we tried to upgrade the broker VMs to larger size, with no change to the behaviour.
  2.  This happens on our test cluster, and on our prod cluster. It seems the frequency by
which this happens in less on the test cluster (it has lower traffic and lower resources)
  3.  We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0
  4.  Broker package version is confluent-kafka-2.11-2.0.1
  5.  I enabled TRACE log level for everything on the test cluster

This is a trace log from the broker. Client logs indicate that the timed-out operation started
at 08:08:21:

2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7 complete transition from
PrepareCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751)
(kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's
transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751)
with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]: Completed transaction
for mu-data-generator-7 with coordinator epoch 375, final state after commit: CompleteCommit
(kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 prepare transition from
CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009,
producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45),
txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) for transaction id
mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:08:21,267] DEBUG TransactionalId mu-data-generator-7 complete transition from
CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:21,267] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's
transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747701097) with coordinator epoch 375 for mu-data-generator-7 succeeded
(kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,828] DEBUG TransactionalId mu-data-generator-7 prepare transition from
Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=PrepareEpochFence, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,829] DEBUG TransactionalId mu-data-generator-7 prepare transition from
Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747766829) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,829] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009,
producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45),
txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) for transaction id
mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 complete transition from
Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747766829) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,978] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's
transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 for mu-data-generator-7 succeeded
(kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 prepare transition from
PrepareAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
(kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,979] INFO [TransactionCoordinator id=2] Completed rollback ongoing transaction
of transactionalId: mu-data-generator-7 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
[2021-01-04 08:09:26,979] TRACE [Transaction Marker Channel Manager 2]: Added marker TxnMarkerEntry{producerId=10450009,
producerEpoch=6502, coordinatorEpoch=375, result=ABORT, partitions=[__consumer_offsets-45]}
for transactional id mu-data-generator-7 to destination broker 2 (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,229] TRACE [Transaction Marker Channel Manager 2]: Completed sending
transaction markers for mu-data-generator-7 as ABORT (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,229] DEBUG [Transaction Marker Channel Manager 2]: Sending mu-data-generator-7's
transaction markers for TransactionMetadata(transactionalId=mu-data-generator-7, producerId=10450009,
producerEpoch=6502, txnTimeoutMs=60000, state=PrepareAbort, pendingState=Some(CompleteAbort),
topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829)
with coordinator epoch 375 succeeded, trying to append complete transaction log now (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,230] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009,
producerEpoch=6502, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097,
txnLastUpdateTimestamp=1609747766978) for transaction id mu-data-generator-7 with coordinator
epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:27,453] DEBUG TransactionalId mu-data-generator-7 complete transition from
PrepareAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
(kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:27,453] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's
transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:27,453] TRACE [Transaction Marker Channel Manager 2]: Completed transaction
for mu-data-generator-7 with coordinator epoch 375, final state after commit: CompleteAbort
(kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:38,059] DEBUG TransactionalId mu-data-generator-7 prepare transition from
CompleteAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000,
txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
(kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:38,060] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009,
producerEpoch=6503, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
txnLastUpdateTimestamp=1609747778059) for transaction id mu-data-generator-7 with coordinator
epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:38,852] DEBUG TransactionalId mu-data-generator-7 complete transition from
CompleteAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000,
txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
(kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:38,852] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's
transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000,
txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:38,852] INFO [TransactionCoordinator id=2] Initialized transactionalId mu-data-generator-7
with producerId 10450009 and producer epoch 6503 on partition __transaction_state-29 (kafka.coordinator.transaction.TransactionCoo

Also this might be interesting:

[2021-01-04 08:08:21,097] TRACE [Kafka Request Handler 9 on Broker 2], Kafka request handler
9 on broker 2 handling request Request(processor=5, connectionId=10.48.111.102:9092-10.48.110.74:45394-99,
session=Session(User:ANONYMOUS,/10.48.1
[2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 prepare transition from
CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000,
txnState=Ongoing, topicPartitions=Set(__consumer_
[2021-01-04 08:08:21,097] TRACE [ReplicaManager broker=2] Append [Map(__transaction_state-29
-> [(record=DefaultRecord(offset=0, timestamp=1609747701097, key=23 bytes, value=65 bytes))])]
to local log (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,097] TRACE Inserting 158 bytes at end offset 14721706 at position 23266940
with largest timestamp 1609747701097 at shallow offset 14721706 (kafka.log.LogSegment)
[2021-01-04 08:08:21,097] TRACE Appended 158 to /mnt/data/kafka/__transaction_state-29/00000000000014565986.log
at end offset 14721706 (kafka.log.LogSegment)
[2021-01-04 08:08:21,097] TRACE [Log partition=__transaction_state-29, dir=/mnt/data/kafka]
Appended message set with last offset: 14721706, first offset: Some(14721706), next offset:
14721707, and messages: [(record=DefaultRecord(offset=1
[2021-01-04 08:08:21,097] DEBUG [ReplicaManager broker=2] Request key __transaction_state-29
unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] DEBUG [Partition __transaction_state-29 broker=2] Skipping update
high watermark since new hw (offset=14721706 segment=[14565986:23266940]) is not larger than
old hw (offset=14721706 segment=[14565986:23266940]).
[2021-01-04 08:08:21,098] TRACE [ReplicaManager broker=2] 158 written to log __transaction_state-29
beginning at offset 14721706 and ending at offset 14721706 (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] DEBUG [ReplicaManager broker=2] Produce to local log in 1 ms (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] TRACE Initial partition status for __transaction_state-29 is [acksPending:
true, error: 7, startOffset: 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for __transaction_state-29,
current status [acksPending: true, error: 7, startOffset: 14721706, requiredOffset: 14721707]
(kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 broker=2] Progress awaiting
ISR acks for offset 14721707: acked: Set(broker 2: 14721707), awaiting Set(broker 3: 14721706)
(kafka.cluster.Partition)
[2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for __transaction_state-29,
current status [acksPending: true, error: 7, startOffset: 14721706, requiredOffset: 14721707]
(kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 broker=2] Progress awaiting
ISR acks for offset 14721707: acked: Set(broker 2: 14721707), awaiting Set(broker 3: 14721706)
(kafka.cluster.Partition)
[2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009,
producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45),
txnStartTim



Apart from upgrading the kafka version or upgrading to newer librdkafka when new confluent_kafka
for python will come out, I am out of ideas of what to try.

I would be grateful for any suggestion about where to look to debug this kind of issue.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message