kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Liu <jiangtao....@zuora.com>
Subject Re: Timeout publishing message to Kafka cluster.
Date Mon, 19 Dec 2016 01:45:48 GMT
if any of you have any idea about this issue, please let me know.

I still confused some points :

1), why restart the node `1002` can automatically solve the `replication
issue` ?
2), is there any possible this issue caused by the `log retention` setting?
 i.e `log.segment.bytes=136870912` , is it too small (137M for each log
file) ?

I still guess this issue is mostly caused by my `server.properties`, but
just no idea which part is wrong.

Thanks.

On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <jiangtao.liu@zuora.com> wrote:

> ​Post the configuration here for help:
>
> [root@2494f8e6fb37 config]# vi server.properties
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Reference :
> # 1), https://kafka.apache.org/documentation#configuration
> # 2), https://kafka.apache.org/documentation#prodconfig
>
> ############################# Server Basics #############################
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=-1
>
> ############################# Socket Server Settings
> #############################
>
> # The address the socket server listens on. It will get the value returned
> from
> # java.net.InetAddress.getCanonicalHostName() if not configured.
> #   FORMAT:
> #     listeners = security_protocol://host_name:port
> #   EXAMPLE:
> #     listeners = PLAINTEXT://your.host.name:9092
> #listeners=PLAINTEXT://:9092
>
> # Hostname and port the broker will advertise to producers and consumers.
> If not set,
> # it uses the value for "listeners" if configured.  Otherwise, it will use
> the value
> # returned from java.net.InetAddress.getCanonicalHostName().
> #advertised.listeners=PLAINTEXT://your.host.name:9092
>
> # The number of threads handling network requests
> num.network.threads=8
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=1048576
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=1048576
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> ############################# Log Basics #############################
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/kafka/data
>
> # Enable auto creation of topic on the server
> auto.create.topics.enable=true
>
> # Enables delete topic. Delete topic through the admin tool will have no
> effect if this config is turned off
> delete.topic.enable=true
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> # by default, we create 8 partitions for each topic, if we wanna increase
> the number, we need to manually enlarge it.
> num.partitions=8
>
> # default replication factors for automatically created topics
> default.replication.factor=3
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> # The interval with which we add an entry to the offset index
> log.index.interval.bytes=4096
>
> # The maximum size in bytes of the offset index
> log.index.size.max.bytes=10485760
>
> ############################# Log Flush Policy
> #############################
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #    1. Durability: Unflushed data may be lost if you are not using
> replication.
> #    2. Latency: Very large flush intervals may lead to latency spikes
> when the flush does occur as there will be a lot of data to flush.
> #    3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on
> a per-topic basis.
>
> # The number of messages to accept before forcing a flush of data to disk
> log.flush.interval.messages=20000
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> log.flush.interval.ms=10000
>
> # The frequency in ms that the log flusher checks whether any log needs to
> be flushed to disk
> log.flush.scheduler.interval.ms=20000
>
> # The frequency with which we update the persistent record of the last
> flush which acts as the log recovery point
> log.flush.offset.checkpoint.interval.ms=60000
>
> ############################# Log Retention Policy
> #############################
>
> # The following configurations control the disposal of log segments. The
> policy can
> # be set to delete segments after a period of time, or after a given size
> has accumulated.
> # A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens
> # from the end of the log.
>
> #The default cleanup policy for segments beyond the retention window. A
> comma separated list of valid policies. Valid policies are: "delete" and
> "compact"
> log.cleanup.policy=delete
>
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=36
>
> # A size-based retention policy for logs. Segments are pruned from the log
> as long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
>
> # The maximum size of a log segment file. When this size is reached a new
> log segment will be created.
> log.segment.bytes=136870912
>
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
>
> #The maximum time before a new log segment is rolled out (in hours),
> secondary to log.roll.ms property
> log.roll.hours=12
>
> #The amount of time to wait before deleting a file from the filesystem
> log.segment.delete.delay.ms=60000
>
> #The maximum size of message that the server can receive
> message.max.bytes=10485760
>
> ############################# Replication #############################
>
> # Number of fetcher threads used to replicate messages from a source
> broker. Increasing this value can increase the degree of I/O parallelism in
> the follower broker.
> num.replica.fetchers=4
>
> # The number of bytes of messages to attempt to fetch for each partition.
> # This is not an absolute maximum,
> # if the first message in the first non-empty partition of the fetch is
> larger than this value, the message will still be returned to ensure that
> progress can be made.
> # The maximum message size accepted by the broker is defined via
> message.max.bytes (broker config) or max.message.bytes (topic config).
> replica.fetch.max.bytes=10485760
>
> # max wait time for each fetcher request issued by follower replicas.
> # This value should always be less than the replica.lag.time.max.ms at
> all times to prevent frequent shrinking of ISR for low throughput topics
> replica.fetch.wait.max.ms=500
>
> # The frequency with which the high watermark is saved out to disk
> replica.high.watermark.checkpoint.interval.ms=5000
>
> # The socket receive buffer for network requests
> replica.socket.timeout.ms=30000
>
> # The socket receive buffer for network requests
> replica.socket.receive.buffer.bytes=65536
>
> # If a follower hasn't sent any fetch requests or hasn't consumed up to
> the leaders log end offset for at least this time, the leader will remove
> the follower from isr
> replica.lag.time.max.ms=10000
>
> # The socket timeout for controller-to-broker channels
> controller.socket.timeout.ms=30000
>
> ############################# Zookeeper #############################
>
> # #The max time that the client waits to establish a connection to
> zookeeper. If not set, the value in zookeeper.session.timeout.ms is used
> zookeeper.connection.timeout.ms=6000
>
> #Zookeeper session timeout
> zookeeper.session.timeout.ms=6000
>
> #Set client to use secure ACLs
> zookeeper.set.acl=false
>
> #How far a ZK follower can be behind a ZK leader
> zookeeper.sync.time.ms=2000
>
>
> zookeeper.connect=10.105.212.11:2181,10.105.211.109:2181,10
> .105.210.103:2181
> jvm.performance.opts=-javaagent:/opt/jolokia/jolokia-jvm-1.3.5-agent.jar=
> host=
> port=9092
> advertised.host.name=10.105.210.24
> advertised.port=9092​
>
> On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <jiangtao.liu@zuora.com> wrote:
>
>> when that error happened, I need to manually restart the kafka node
>> `1002`, after restart finishing, all of the partition is being healthy
>> again.
>>
>> i.e
>> *before start ​:*
>> 3 *1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)* *(1002)* *true* *true*
>>
>> *​After start:*
>> 3
>> *​       ​1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)*
>> *(1002​, 1004, 1005​)* *true* *true*
>> ​
>>
>>
>> On Sun, Dec 18, 2016 at 5:29 PM, Tony Liu <jiangtao.liu@zuora.com> wrote:
>>
>>> Hi,
>>>
>>> Recently, we ran into the `batch expired` error in several days, may be
>>> 3 or 5 days, there is not fixed frequency.
>>>
>>> *A,* the error is:
>>> Exception Class : org.apache.kafka.common.errors.TimeoutException
>>> Error Message : Batch Expired
>>>
>>> *B*: server.log from kafka :
>>>
>>> [2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
>>> Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
>>> Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker
>>> 1002: Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to
>>> 1002 (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on
>>> broker 1002: Shrinking ISR for partition [__consumer_offsets,8] from
>>> 1005,1004,1002 to 1002 (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
>>> Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 21:17:59,888] INFO  Rolled new log segment for
>>> '__consumer_offsets-46' in 1 ms. (kafka.log.Log)
>>> [2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000000000000.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000101935860.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000000000000.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000101935860.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:08,410] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000102963875.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:08,410] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000102963875.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18'
>>> in 0 ms. (kafka.log.Log)
>>>
>>> *C*, when that kind of error happened, we always see the replication
>>> being in problem, like:
>>>
>>> Topics
>>> Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
>>> Replicated %Producer Message/Sec
>>> __consumer_offsets
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
>>> 50 6 100 0 3 16 0.00
>>> ConnectorSync
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
>>> 8 6 100 16 3 25 0.00
>>> EventInstance
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
>>> 8 6 100 16 3 12 0.00
>>> fjord_healthy_checker
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
>>> 8 6 100 16 3 12 0.00
>>> HeartBeat
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
>>> 8 6 100 16 3 12 0.00
>>> HeartBit
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
>>> 8 6 100 0 3 25 0.00
>>> Notification
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
>>> 8 6 100 33 3 12 0.00
>>> NotificationEventInstance
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
>>> 8 6 100 16 3 12 0.00
>>> thl_raw
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
>>> 64 6 100 0 3 17 0.00
>>> *D*, All of the replication sounds related with node '1002` (click into
>>> the each of topic, all of the issued partitions having the similar like `*blue
>>> highlight*` )
>>> Partition Information
>>> PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
>>> Replicated?
>>> 0 1005
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>>> (1005,1001,1002) (1005,1002,1001) true false
>>> 1 1006
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>>> (1006,1002,1003) (1006,1003,1002) true false
>>> 2 1001
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
>>> (1001,1003,1004) (1004,1003,1001) true false
>>> 3 *1002*
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>>> *(1002,1004,1005)* *(1002)* *true* *true*
>>> 4 1003
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
>>> (1003,1005,1006) (1003,1006,1005) true false
>>> 5 1004
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
>>> (1004,1006,1001) (1004,1001,1006) true false
>>> 6 1005
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>>> (1005,1002,1003) (1003,1005,1002) true false
>>> 7 1006
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>>> (1006,1003,1004) (1003,1006,1004) true false
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message