kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Liu <jiangtao....@zuora.com>
Subject Re: Timeout publishing message to Kafka cluster.
Date Mon, 19 Dec 2016 01:35:56 GMT
​Post the configuration here for help:

[root@2494f8e6fb37 config]# vi server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Reference :
# 1), https://kafka.apache.org/documentation#configuration
# 2), https://kafka.apache.org/documentation#prodconfig

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=-1

############################# Socket Server Settings
#############################

# The address the socket server listens on. It will get the value returned
from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers.
If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use
the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=8

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/kafka/data

# Enable auto creation of topic on the server
auto.create.topics.enable=true

# Enables delete topic. Delete topic through the admin tool will have no
effect if this config is turned off
delete.topic.enable=true

# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
# by default, we create 8 partitions for each topic, if we wanna increase
the number, we need to manually enlarge it.
num.partitions=8

# default replication factors for automatically created topics
default.replication.factor=3

# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data
dirs located in RAID array.
num.recovery.threads.per.data.dir=1

# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096

# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of
data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using
replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=20000

# The maximum amount of time a message can sit in a log before we force a
flush
log.flush.interval.ms=10000

# The frequency in ms that the log flusher checks whether any log needs to
be flushed to disk
log.flush.scheduler.interval.ms=20000

# The frequency with which we update the persistent record of the last
flush which acts as the log recovery point
log.flush.offset.checkpoint.interval.ms=60000

############################# Log Retention Policy
#############################

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.

#The default cleanup policy for segments beyond the retention window. A
comma separated list of valid policies. Valid policies are: "delete" and
"compact"
log.cleanup.policy=delete

# The minimum age of a log file to be eligible for deletion
log.retention.hours=36

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=136870912

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.retention.check.interval.ms=300000

#The maximum time before a new log segment is rolled out (in hours),
secondary to log.roll.ms property
log.roll.hours=12

#The amount of time to wait before deleting a file from the filesystem
log.segment.delete.delay.ms=60000

#The maximum size of message that the server can receive
message.max.bytes=10485760

############################# Replication #############################

# Number of fetcher threads used to replicate messages from a source
broker. Increasing this value can increase the degree of I/O parallelism in
the follower broker.
num.replica.fetchers=4

# The number of bytes of messages to attempt to fetch for each partition.
# This is not an absolute maximum,
# if the first message in the first non-empty partition of the fetch is
larger than this value, the message will still be returned to ensure that
progress can be made.
# The maximum message size accepted by the broker is defined via
message.max.bytes (broker config) or max.message.bytes (topic config).
replica.fetch.max.bytes=10485760

# max wait time for each fetcher request issued by follower replicas.
# This value should always be less than the replica.lag.time.max.ms at all
times to prevent frequent shrinking of ISR for low throughput topics
replica.fetch.wait.max.ms=500

# The frequency with which the high watermark is saved out to disk
replica.high.watermark.checkpoint.interval.ms=5000

# The socket receive buffer for network requests
replica.socket.timeout.ms=30000

# The socket receive buffer for network requests
replica.socket.receive.buffer.bytes=65536

# If a follower hasn't sent any fetch requests or hasn't consumed up to the
leaders log end offset for at least this time, the leader will remove the
follower from isr
replica.lag.time.max.ms=10000

# The socket timeout for controller-to-broker channels
controller.socket.timeout.ms=30000

############################# Zookeeper #############################

# #The max time that the client waits to establish a connection to
zookeeper. If not set, the value in zookeeper.session.timeout.ms is used
zookeeper.connection.timeout.ms=6000

#Zookeeper session timeout
zookeeper.session.timeout.ms=6000

#Set client to use secure ACLs
zookeeper.set.acl=false

#How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000


zookeeper.connect=10.105.212.11:2181,10.105.211.109:2181,10.105.210.103:2181
jvm.performance.opts=-javaagent:/opt/jolokia/jolokia-jvm-1.3.5-agent.jar=host=
port=9092
advertised.host.name=10.105.210.24
advertised.port=9092​

On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <jiangtao.liu@zuora.com> wrote:

> when that error happened, I need to manually restart the kafka node
> `1002`, after restart finishing, all of the partition is being healthy
> again.
>
> i.e
> *before start ​:*
> 3 *1002*
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
> *(1002,1004,1005)* *(1002)* *true* *true*
>
> *​After start:*
> 3
> *​       ​1002*
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
> *(1002,1004,1005)*
> *(1002​, 1004, 1005​)* *true* *true*
> ​
>
>
> On Sun, Dec 18, 2016 at 5:29 PM, Tony Liu <jiangtao.liu@zuora.com> wrote:
>
>> Hi,
>>
>> Recently, we ran into the `batch expired` error in several days, may be 3
>> or 5 days, there is not fixed frequency.
>>
>> *A,* the error is:
>> Exception Class : org.apache.kafka.common.errors.TimeoutException
>> Error Message : Batch Expired
>>
>> *B*: server.log from kafka :
>>
>> [2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
>> Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
>> Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
>> Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
>> Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker
>> 1002: Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to
>> 1002 (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on
>> broker 1002: Shrinking ISR for partition [__consumer_offsets,8] from
>> 1005,1004,1002 to 1002 (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
>> Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
>> Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 21:17:59,888] INFO  Rolled new log segment for
>> '__consumer_offsets-46' in 1 ms. (kafka.log.Log)
>> [2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000000000000.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000101935860.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000000000000.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000101935860.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:08,410] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000102963875.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:08,410] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000102963875.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24'
>> in 1 ms. (kafka.log.Log)
>> [2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9' in
>> 1 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54'
>> in 1 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18'
>> in 0 ms. (kafka.log.Log)
>>
>> *C*, when that kind of error happened, we always see the replication
>> being in problem, like:
>>
>> Topics
>> Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
>> Replicated %Producer Message/Sec
>> __consumer_offsets
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
>> 50 6 100 0 3 16 0.00
>> ConnectorSync
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
>> 8 6 100 16 3 25 0.00
>> EventInstance
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
>> 8 6 100 16 3 12 0.00
>> fjord_healthy_checker
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
>> 8 6 100 16 3 12 0.00
>> HeartBeat
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
>> 8 6 100 16 3 12 0.00
>> HeartBit
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
>> 8 6 100 0 3 25 0.00
>> Notification
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
>> 8 6 100 33 3 12 0.00
>> NotificationEventInstance
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
>> 8 6 100 16 3 12 0.00
>> thl_raw
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
>> 64 6 100 0 3 17 0.00
>> *D*, All of the replication sounds related with node '1002` (click into
>> the each of topic, all of the issued partitions having the similar like `*blue
>> highlight*` )
>> Partition Information
>> PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
>> Replicated?
>> 0 1005
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>> (1005,1001,1002) (1005,1002,1001) true false
>> 1 1006
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>> (1006,1002,1003) (1006,1003,1002) true false
>> 2 1001
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
>> (1001,1003,1004) (1004,1003,1001) true false
>> 3 *1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)* *(1002)* *true* *true*
>> 4 1003
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
>> (1003,1005,1006) (1003,1006,1005) true false
>> 5 1004
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
>> (1004,1006,1001) (1004,1001,1006) true false
>> 6 1005
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>> (1005,1002,1003) (1003,1005,1002) true false
>> 7 1006
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>> (1006,1003,1004) (1003,1006,1004) true false
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message