kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
Subject Re: Offset commit consistently timing out in transaction
Date Fri, 08 Jan 2021 10:38:41 GMT
Timeout in offset commit request has been fixed recently. This issue seems
to be more of KAFKA-8334 <https://issues.apache.org/jira/browse/KAFKA-8334>
.

On Mon, Jan 4, 2021 at 3:53 PM Kindernay Oliver
<oliver.kindernay@eurowag.com.invalid> wrote:

> Hello,
>
> we are experiencing problems with offset commit timing out on brokers.
> This started to happen when we started using transactions.
> send_offsets_to_transactions periodically (every few minutes) times out. We
> have cluster of three brokers, and topics with 15 partitions. We use one
> transactional producer per partition and commit the transaction at least
> each second or more. The time out seemingly happens randomly, each time for
> a different producer instance and different broker.
>
>
>   1.  We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that
> error came from a broker.
>   2.  We tried to raise offsets.commit.timeout.ms on broker to 60 seconds
>   3.  After the change, we are getting transaction operations timeout
> after 60s, with same periodicity. This is now a client error since the
> kafka broker takes the full minute, after which we would probably see the
> same error message from broker as previsously.
>      *   <class
> 'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional
> operation timed out"}
>      *   %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out
> TxnOffsetCommitRequest in flight (after 59943ms, timeout #0)
> %4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out 1
> in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
> %7|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP) (_TIMED_OUT)
> %3|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP)
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> UP -> DOWN
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> DOWN -> INIT
> %7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]:
> 10.48.111.102:9092/2: MetadataRequest failed: Local: Timed out: actions
> Retry
> %7|1609747761.312|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> INIT -> TRY_CONNECT
> %7|1609747761.312|RETRY|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Moved 1 retry
> buffer(s) to output queue
>
>
>
>   1.  Broker system metrics (disk, network, CPU, memory) did not indicate
> any bottleneck, Anyway, we tried to upgrade the broker VMs to larger size,
> with no change to the behaviour.
>   2.  This happens on our test cluster, and on our prod cluster. It seems
> the frequency by which this happens in less on the test cluster (it has
> lower traffic and lower resources)
>   3.  We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0
>   4.  Broker package version is confluent-kafka-2.11-2.0.1
>   5.  I enabled TRACE log level for everything on the test cluster
>
> This is a trace log from the broker. Client logs indicate that the
> timed-out operation started at 08:08:21:
>
> 2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7
> complete transition from PrepareCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]:
> Completed transaction for mu-data-generator-7 with coordinator epoch 375,
> final state after commit: CompleteCommit
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:08:21,267] DEBUG TransactionalId mu-data-generator-7
> complete transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:21,267] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,828] DEBUG TransactionalId mu-data-generator-7
> prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareEpochFence,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,829] DEBUG TransactionalId mu-data-generator-7
> prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,829] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7
> complete transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,978] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7
> prepare transition from PrepareAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,979] INFO [TransactionCoordinator id=2] Completed
> rollback ongoing transaction of transactionalId: mu-data-generator-7 due to
> timeout (kafka.coordinator.transaction.TransactionCoordinator)
> [2021-01-04 08:09:26,979] TRACE [Transaction Marker Channel Manager 2]:
> Added marker TxnMarkerEntry{producerId=10450009, producerEpoch=6502,
> coordinatorEpoch=375, result=ABORT, partitions=[__consumer_offsets-45]} for
> transactional id mu-data-generator-7 to destination broker 2
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,229] TRACE [Transaction Marker Channel Manager 2]:
> Completed sending transaction markers for mu-data-generator-7 as ABORT
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,229] DEBUG [Transaction Marker Channel Manager 2]:
> Sending mu-data-generator-7's transaction markers for
> TransactionMetadata(transactionalId=mu-data-generator-7,
> producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
> state=PrepareAbort, pendingState=Some(CompleteAbort),
> topicPartitions=Set(), txnStartTimestamp=1609747701097,
> txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 succeeded,
> trying to append complete transaction log now
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,230] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:27,453] DEBUG TransactionalId mu-data-generator-7
> complete transition from PrepareAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:27,453] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:27,453] TRACE [Transaction Marker Channel Manager 2]:
> Completed transaction for mu-data-generator-7 with coordinator epoch 375,
> final state after commit: CompleteAbort
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:38,059] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:38,060] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) for transaction
> id mu-data-generator-7 with coordinator epoch 375 to the local transaction
> log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:38,852] DEBUG TransactionalId mu-data-generator-7
> complete transition from CompleteAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:38,852] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:38,852] INFO [TransactionCoordinator id=2] Initialized
> transactionalId mu-data-generator-7 with producerId 10450009 and producer
> epoch 6503 on partition __transaction_state-29
> (kafka.coordinator.transaction.TransactionCoo
>
> Also this might be interesting:
>
> [2021-01-04 08:08:21,097] TRACE [Kafka Request Handler 9 on Broker 2],
> Kafka request handler 9 on broker 2 handling request Request(processor=5,
> connectionId=10.48.111.102:9092-10.48.110.74:45394-99,
> session=Session(User:ANONYMOUS,/10.48.1
> [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_
> [2021-01-04 08:08:21,097] TRACE [ReplicaManager broker=2] Append
> [Map(__transaction_state-29 -> [(record=DefaultRecord(offset=0,
> timestamp=1609747701097, key=23 bytes, value=65 bytes))])] to local log
> (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,097] TRACE Inserting 158 bytes at end offset 14721706
> at position 23266940 with largest timestamp 1609747701097 at shallow offset
> 14721706 (kafka.log.LogSegment)
> [2021-01-04 08:08:21,097] TRACE Appended 158 to
> /mnt/data/kafka/__transaction_state-29/00000000000014565986.log at end
> offset 14721706 (kafka.log.LogSegment)
> [2021-01-04 08:08:21,097] TRACE [Log partition=__transaction_state-29,
> dir=/mnt/data/kafka] Appended message set with last offset: 14721706, first
> offset: Some(14721706), next offset: 14721707, and messages:
> [(record=DefaultRecord(offset=1
> [2021-01-04 08:08:21,097] DEBUG [ReplicaManager broker=2] Request key
> __transaction_state-29 unblocked 0 fetch requests.
> (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] DEBUG [Partition __transaction_state-29
> broker=2] Skipping update high watermark since new hw (offset=14721706
> segment=[14565986:23266940]) is not larger than old hw (offset=14721706
> segment=[14565986:23266940]).
> [2021-01-04 08:08:21,098] TRACE [ReplicaManager broker=2] 158 written to
> log __transaction_state-29 beginning at offset 14721706 and ending at
> offset 14721706 (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] DEBUG [ReplicaManager broker=2] Produce to local
> log in 1 ms (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] TRACE Initial partition status for
> __transaction_state-29 is [acksPending: true, error: 7, startOffset:
> 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for
> __transaction_state-29, current status [acksPending: true, error: 7,
> startOffset: 14721706, requiredOffset: 14721707]
> (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29
> broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker
> 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
> [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for
> __transaction_state-29, current status [acksPending: true, error: 7,
> startOffset: 14721706, requiredOffset: 14721707]
> (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29
> broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker
> 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
> [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45), txnStartTim
>
>
>
> Apart from upgrading the kafka version or upgrading to newer librdkafka
> when new confluent_kafka for python will come out, I am out of ideas of
> what to try.
>
> I would be grateful for any suggestion about where to look to debug this
> kind of issue.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message