kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Liu <jiangtao....@zuora.com>
Subject WARN [ReplicaFetcherThread-1-1011], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c
Date Tue, 20 Dec 2016 00:33:28 GMT
​Hi Experts,

is there anyone run into this connection error ?

[2016-12-17 20:13:32,728] WARN  [ReplicaFetcherThread-1-1011], Error
in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c
(kafka.server.ReplicaFetcherThread)java.io.IOException: Connection to (id: 1011 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

by the way, I check the source code, which said there are lots of reasons
causing that error, so I am not clear about what the 'lots of reasons` are,
so that I put effort to do further addressing.

 * Invokes `client.send` followed by 1 or more `client.poll`
invocations until a response is received or a
 * disconnection happens (which can happen for a number of reasons
including a request timeout).
 * In case of a disconnection, an `IOException` is thrown.
 * This method is useful for implementing blocking behaviour on top of
the non-blocking `NetworkClient`, use it with
 * care.
def blockingSendAndReceive(request: ClientRequest)(implicit time:
JTime): ClientResponse = {
  client.send(request, time.milliseconds())

  pollContinuously { responses =>
    val response = responses.find { response =>
      response.request.request.header.correlationId ==
    response.foreach { r =>
      if (r.wasDisconnected) {
        val destination = request.request.destination
        throw new IOException(s"Connection to $destination was
disconnected before the response was read")


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message