kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Fri, 15 Sep 2017 17:02:13 GMT
Hi Damian,

One other observation is that if I stop the instance which was running fine
the other instance which was hung will come up in some time and start
processing fine.
Over the period of testing , I have seen that  if we change the application
id, then the application will run for sometime on all three instances but
after some time the other instances will stop processing it.

But still this model is not scalable. I am not sure what else I need to do
in my kafka streams application to make it scale.


I have attached the logs capturing Streamthread pattern with this mail
(*streamlogs.zip
attachement with mail*). There are logs for three instances and they are
named as

1) stream-thread-102.txt (1st instance )
2) stream-thread-101.txt (2nd instance )
 3) stream-thread-100.txt (3 rd instance )

While we running or application "

--"stream-thread-102" was consistently running.
-- "stream-thread-101"  was inconsistently processing. There were hugs
pauses and it resumed and finally when I took the logs it was not
processing at all
-- "stream-thread-100"  It was started after one hour and the application
was not processing messages at all.

When the application is stuck , I could see below pattern consistently
within logs(*below my signature*) on the instances which are not processing
any messages . I appreciate all the help you are providing. I am trying to
replace a production spark application with Kafka streams . I am not sure
where I am going wrong. What configuration or changes I need to
additionally  do my side to get these messages processed.

Thanks
Dev


DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name connections-closed:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name connections-created:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-sent-received:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-sent:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-received:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name select-time:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name io-time:
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.bytes-sent
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.bytes-received
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.latency
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.bytes-sent
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.bytes-received
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.latency
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.bytes-sent
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.bytes-received
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.latency
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.bytes-sent
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.bytes-received
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.latency
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.bytes-sent
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.bytes-received
DEBUG | 11:51:33 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.latency
DEBUG | 11:51:33 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - The
Kafka consumer has closed.
DEBUG | 11:51:37 | internals.AbstractCoordinator
(AbstractCoordinator.java:737) - Sending Heartbeat request for group
ProximityKafka-proxkafkalivereplicaengine05 to coordinator
mykafkainstancekafka5101.mydomain.com.com:9092 (id: 2147483647 rack: null)
DEBUG | 11:51:37 | internals.AbstractCoordinator
(AbstractCoordinator.java:750) - Received successful Heartbeat response for
group ProximityKafka-proxkafkalivereplicaengine05
DEBUG | 11:51:37 | clients.NetworkClient (NetworkClient.java:889) - Sending
metadata request (type=MetadataRequest, topics=<ALL>) to node 1
DEBUG | 11:51:38 | clients.Metadata (Metadata.java:251) - Updated cluster
metadata version 24 to Cluster(id = NR-WaCkTRwK9Dl2sNKozPQ, nodes = [
mykafkainstancekafka5101.mydomain.com.com:9092 (id: 0 rack: null),
mykafkainstancekafka5102.mydomain.com.com:9092 (id: 1 rack: null)],
partitions = [Partition(topic = MYTOPIC05SEPT, partition = 28, leader = 1,
replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
26, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 24, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 22, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 20, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
18, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 16, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 14, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 11, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
9, leader = 0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT,
partition = 7, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 5, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 3, leader = 0, replicas = [0],
isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 1, leader = 0,
replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
34, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 32, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 30, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 27, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
25, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 23, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 21, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 19, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
17, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 15, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 13, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 12, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
10, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 8, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 6, leader = 1, replicas = [1],
isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 35, leader = 0,
replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 4,
leader = 1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT,
partition = 33, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 2, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 31, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 0, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
29, leader = 0, replicas = [0], isr = [0])])
INFO  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:223) -
ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [l-mykafkainstancekafka5101:9092,
l-mykafkainstancekafka5102:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = ProximityKafka-proxkafkalivereplicaengine05
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class mx.july.jmx.proximity.kafka.KafkaKryoCodec
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class
mx.july.jmx.proximity.util.ProtoBufLocationUpdateCodec

DEBUG | 11:51:43 | consumer.KafkaConsumer (KafkaConsumer.java:643) -
Starting the Kafka consumer
DEBUG | 11:51:43 | clients.Metadata (Metadata.java:251) - Updated cluster
metadata version 1 to Cluster(id = null, nodes =
[l-mykafkainstancekafka5101:9092 (id: -1 rack: null),
l-mykafkainstancekafka5102:9092 (id: -2 rack: null)], partitions = [])
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name fetch-throttle-time
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name connections-closed:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name connections-created:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name bytes-sent-received:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name bytes-sent:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name bytes-received:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name select-time:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name io-time:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name heartbeat-latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name join-latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name sync-latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name commit-latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name bytes-fetched
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name records-fetched
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name fetch-latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name records-lag
WARN  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:231) - The
configuration 'key.serializer.class' was supplied but isn't a known config.
WARN  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:231) - The
configuration 'value.serializer' was supplied but isn't a known config.
WARN  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:231) - The
configuration 'consumer.timeout.ms' was supplied but isn't a known config.
WARN  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:231) - The
configuration 'serializer.class' was supplied but isn't a known config.
WARN  | 11:51:43 | consumer.ConsumerConfig (AbstractConfig.java:231) - The
configuration 'key.serializer' was supplied but isn't a known config.
INFO  | 11:51:43 | utils.AppInfoParser (AppInfoParser.java:83) - Kafka
version : 0.11.0.0
INFO  | 11:51:43 | utils.AppInfoParser (AppInfoParser.java:84) - Kafka
commitId : cb8625948210849f
DEBUG | 11:51:43 | consumer.KafkaConsumer (KafkaConsumer.java:759) - Kafka
consumer created
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node -1 at l-mykafkainstancekafka5101:9092.
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--1.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--1.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--1.latency
DEBUG | 11:51:43 | network.Selector (Selector.java:363) - Created socket
with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:722) -
Completed connection to node -1.  Fetching API versions.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:736) -
Initiating API versions fetch from node -1.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:905) -
Initialize connection to node -2 for sending metadata request
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node -2 at l-mykafkainstancekafka5102:9092.
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--2.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--2.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node--2.latency
DEBUG | 11:51:43 | network.Selector (Selector.java:363) - Created socket
with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:722) -
Completed connection to node -2.  Fetching API versions.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:736) -
Initiating API versions fetch from node -2.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:689) -
Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3],
Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3):
0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
[usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3
[usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2
[usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1
[usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1
[usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0
[usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2
[usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0
[usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0
[usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0
[usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0],
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0],
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:889) - Sending
metadata request (type=MetadataRequest, topics=) to node -1
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:689) -
Recorded API versions for node -2: (Produce(0): 0 to 3 [usable: 3],
Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3):
0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
[usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3
[usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2
[usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1
[usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1
[usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0
[usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2
[usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0
[usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0
[usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0
[usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0],
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0],
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
DEBUG | 11:51:43 | clients.Metadata (Metadata.java:251) - Updated cluster
metadata version 2 to Cluster(id = NR-WaCkTRwK9Dl2sNKozPQ, nodes = [
mykafkainstancekafka5101.mydomain.com.com:9092 (id: 0 rack: null),
mykafkainstancekafka5102.mydomain.com.com:9092 (id: 1 rack: null)],
partitions = [])
DEBUG | 11:51:43 | consumer.KafkaConsumer (KafkaConsumer.java:992) -
Subscribed to partition(s): MYTOPIC05SEPT-23, MYTOPIC05SEPT-32,
MYTOPIC05SEPT-8, MYTOPIC05SEPT-17, MYTOPIC05SEPT-35, MYTOPIC05SEPT-26,
MYTOPIC05SEPT-11, MYTOPIC05SEPT-29, MYTOPIC05SEPT-2, MYTOPIC05SEPT-20,
MYTOPIC05SEPT-5, MYTOPIC05SEPT-14, MYTOPIC05SEPT-13, MYTOPIC05SEPT-4,
MYTOPIC05SEPT-22, MYTOPIC05SEPT-31, MYTOPIC05SEPT-7, MYTOPIC05SEPT-16,
MYTOPIC05SEPT-25, MYTOPIC05SEPT-34, MYTOPIC05SEPT-10, MYTOPIC05SEPT-1,
MYTOPIC05SEPT-19, MYTOPIC05SEPT-28, MYTOPIC05SEPT-27, MYTOPIC05SEPT-9,
MYTOPIC05SEPT-18, MYTOPIC05SEPT-21, MYTOPIC05SEPT-12, MYTOPIC05SEPT-3,
MYTOPIC05SEPT-30, MYTOPIC05SEPT-15, MYTOPIC05SEPT-24, MYTOPIC05SEPT-6,
MYTOPIC05SEPT-33, MYTOPIC05SEPT-0
DEBUG | 11:51:43 | internals.AbstractCoordinator
(AbstractCoordinator.java:572) - Sending GroupCoordinator request for group
ProximityKafka-proxkafkalivereplicaengine05 to broker
mykafkainstancekafka5102.mydomain.com.com:9092 (id: 1 rack: null)
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node 1 at
mykafkainstancekafka5102.mydomain.com.com:9092.
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-1.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-1.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-1.latency
DEBUG | 11:51:43 | network.Selector (Selector.java:363) - Created socket
with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:722) -
Completed connection to node 1.  Fetching API versions.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:736) -
Initiating API versions fetch from node 1.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:905) -
Initialize connection to node 0 for sending metadata request
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node 0 at
mykafkainstancekafka5101.mydomain.com.com:9092.
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-0.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-0.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-0.latency
DEBUG | 11:51:43 | network.Selector (Selector.java:363) - Created socket
with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:722) -
Completed connection to node 0.  Fetching API versions.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:736) -
Initiating API versions fetch from node 0.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:689) -
Recorded API versions for node 1: (Produce(0): 0 to 3 [usable: 3],
Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3):
0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
[usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3
[usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2
[usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1
[usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1
[usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0
[usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2
[usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0
[usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0
[usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0
[usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0],
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0],
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:889) - Sending
metadata request (type=MetadataRequest, topics=MYTOPIC05SEPT) to node 1
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:689) -
Recorded API versions for node 0: (Produce(0): 0 to 3 [usable: 3],
Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3):
0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
[usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3
[usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2
[usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1
[usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1
[usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0
[usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2
[usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0
[usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0
[usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0
[usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0],
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0],
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
DEBUG | 11:51:43 | clients.Metadata (Metadata.java:251) - Updated cluster
metadata version 3 to Cluster(id = NR-WaCkTRwK9Dl2sNKozPQ, nodes = [
mykafkainstancekafka5101.mydomain.com.com:9092 (id: 0 rack: null),
mykafkainstancekafka5102.mydomain.com.com:9092 (id: 1 rack: null)],
partitions = [Partition(topic = MYTOPIC05SEPT, partition = 28, leader = 1,
replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
26, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 24, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 22, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 20, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
18, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 16, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 14, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 11, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
9, leader = 0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT,
partition = 7, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 5, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 3, leader = 0, replicas = [0],
isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 1, leader = 0,
replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
34, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 32, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 30, leader = 1, replicas =
[1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 27, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
25, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 23, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 21, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 19, leader =
0, replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition =
17, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 15, leader = 0, replicas = [0], isr = [0]),
Partition(topic = MYTOPIC05SEPT, partition = 13, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 12, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
10, leader = 1, replicas = [1], isr = [1]), Partition(topic =
MYTOPIC05SEPT, partition = 8, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 6, leader = 1, replicas = [1],
isr = [1]), Partition(topic = MYTOPIC05SEPT, partition = 35, leader = 0,
replicas = [0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 4,
leader = 1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT,
partition = 33, leader = 0, replicas = [0], isr = [0]), Partition(topic =
MYTOPIC05SEPT, partition = 2, leader = 1, replicas = [1], isr = [1]),
Partition(topic = MYTOPIC05SEPT, partition = 31, leader = 0, replicas =
[0], isr = [0]), Partition(topic = MYTOPIC05SEPT, partition = 0, leader =
1, replicas = [1], isr = [1]), Partition(topic = MYTOPIC05SEPT, partition =
29, leader = 0, replicas = [0], isr = [0])])
DEBUG | 11:51:43 | internals.AbstractCoordinator
(AbstractCoordinator.java:583) - Received GroupCoordinator response
ClientResponse(receivedTimeMs=1505494303698, latencyMs=6,
disconnected=false,
requestHeader={api_key=10,api_version=1,correlation_id=4,client_id=consumer-612},
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=mykafkainstancekafka5101.mydomain.com.com:9092 (id: 0
rack: null))) for group ProximityKafka-proxkafkalivereplicaengine05
INFO  | 11:51:43 | internals.AbstractCoordinator
(AbstractCoordinator.java:597) - Discovered coordinator
mykafkainstancekafka5101.mydomain.com.com:9092 (id: 2147483647 rack: null)
for group ProximityKafka-proxkafkalivereplicaengine05.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node 2147483647 at
mykafkainstancekafka5101.mydomain.com.com:9092.
DEBUG | 11:51:43 | internals.ConsumerCoordinator
(ConsumerCoordinator.java:826) - Group
ProximityKafka-proxkafkalivereplicaengine05 fetching committed offsets for
partitions: [MYTOPIC05SEPT-28, MYTOPIC05SEPT-26, MYTOPIC05SEPT-24,
MYTOPIC05SEPT-22, MYTOPIC05SEPT-20, MYTOPIC05SEPT-18, MYTOPIC05SEPT-16,
MYTOPIC05SEPT-14, MYTOPIC05SEPT-11, MYTOPIC05SEPT-9, MYTOPIC05SEPT-7,
MYTOPIC05SEPT-5, MYTOPIC05SEPT-3, MYTOPIC05SEPT-1, MYTOPIC05SEPT-34,
MYTOPIC05SEPT-32, MYTOPIC05SEPT-30, MYTOPIC05SEPT-27, MYTOPIC05SEPT-25,
MYTOPIC05SEPT-23, MYTOPIC05SEPT-21, MYTOPIC05SEPT-19, MYTOPIC05SEPT-17,
MYTOPIC05SEPT-15, MYTOPIC05SEPT-13, MYTOPIC05SEPT-12, MYTOPIC05SEPT-10,
MYTOPIC05SEPT-8, MYTOPIC05SEPT-6, MYTOPIC05SEPT-35, MYTOPIC05SEPT-4,
MYTOPIC05SEPT-33, MYTOPIC05SEPT-2, MYTOPIC05SEPT-31, MYTOPIC05SEPT-0,
MYTOPIC05SEPT-29]
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-2147483647.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-2147483647.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:403) - Added sensor with
name node-2147483647.latency
DEBUG | 11:51:43 | network.Selector (Selector.java:363) - Created socket
with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node
2147483647
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:722) -
Completed connection to node 2147483647.  Fetching API versions.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:736) -
Initiating API versions fetch from node 2147483647.
DEBUG | 11:51:43 | clients.NetworkClient (NetworkClient.java:689) -
Recorded API versions for node 2147483647: (Produce(0): 0 to 3 [usable: 3],
Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3):
0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
[usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3
[usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2
[usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1
[usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1
[usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0
[usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2
[usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0
[usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0
[usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0
[usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0],
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0],
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-28 to the committed offset 47131
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-26 to the committed offset 49269
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-24 to the committed offset 50887
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-22 to the committed offset 49094
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-20 to the committed offset 49342
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-18 to the committed offset 50114
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-16 to the committed offset 79780
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-14 to the committed offset 82801
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-11 to the committed offset 57608
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-9 to the committed offset 60101
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-7 to the committed offset 66478
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-5 to the committed offset 57114
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-3 to the committed offset 64520
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-1 to the committed offset 59346
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-34 to the committed offset 80520
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-32 to the committed offset 76699
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-30 to the committed offset 75570
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-27 to the committed offset 45030
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-25 to the committed offset 47271
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-23 to the committed offset 50501
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-21 to the committed offset 50300
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-19 to the committed offset 49069
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-17 to the committed offset 75918
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-15 to the committed offset 75643
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-13 to the committed offset 75893
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-12 to the committed offset 77475
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-10 to the committed offset 58403
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-8 to the committed offset 57245
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-6 to the committed offset 59636
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-35 to the committed offset 78042
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-4 to the committed offset 61534
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-33 to the committed offset 76625
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-2 to the committed offset 69812
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-31 to the committed offset 75832
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-0 to the committed offset 60643
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:278) - Resetting offset
for partition MYTOPIC05SEPT-29 to the committed offset 49308
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-28. Fetched offset 141029,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-26. Fetched offset 146689,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-24. Fetched offset 155074,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-22. Fetched offset 146722,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-20. Fetched offset 149056,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-18. Fetched offset 148654,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-16. Fetched offset 148522,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-14. Fetched offset 154624,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-12. Fetched offset 146217,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-10. Fetched offset 143537,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-8. Fetched offset 139250,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-6. Fetched offset 144801,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-4. Fetched offset 150601,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-34. Fetched offset 152845,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-2. Fetched offset 154292,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-32. Fetched offset 143918,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-0. Fetched offset 146847,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-30. Fetched offset 139260,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-27. Fetched offset 133983,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-25. Fetched offset 142668,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-23. Fetched offset 153004,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-21. Fetched offset 149196,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-19. Fetched offset 148422,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-17. Fetched offset 142753,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-15. Fetched offset 140157,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-13. Fetched offset 140710,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-11. Fetched offset 140156,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-9. Fetched offset 144958,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-7. Fetched offset 141056,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-5. Fetched offset 131643,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-3. Fetched offset 160416,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-35. Fetched offset 143510,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-1. Fetched offset 139207,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-33. Fetched offset 141712,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-31. Fetched offset 141294,
timestamp -1
DEBUG | 11:51:43 | internals.Fetcher (Fetcher.java:719) - Handling
ListOffsetResponse response for MYTOPIC05SEPT-29. Fetched offset 151172,
timestamp -1
INFO  | 11:51:43 | kafka.KafkaQueueMonitor (KafkaQueueMonitor.java:73) -
Kafka Topic Performance Metrics     KafkaBorker
::l-mykafkainstancekafka5101:9092,l-mykafkainstancekafka5102:9092    Topic
::MYTOPIC05SEPT    GroupId ::ProximityKafka-proxkafkalivereplicaengine05
Kafka Topic Parition :: 28 EndOffset :: 141029 CurrentOffset :: 47131 Lag
:: 93898
Kafka Topic Parition :: 26 EndOffset :: 146689 CurrentOffset :: 49269 Lag
:: 97420
Kafka Topic Parition :: 24 EndOffset :: 155074 CurrentOffset :: 50887 Lag
:: 104187
Kafka Topic Parition :: 22 EndOffset :: 146722 CurrentOffset :: 49094 Lag
:: 97628
Kafka Topic Parition :: 20 EndOffset :: 149056 CurrentOffset :: 49342 Lag
:: 99714
Kafka Topic Parition :: 18 EndOffset :: 148654 CurrentOffset :: 50114 Lag
:: 98540
Kafka Topic Parition :: 16 EndOffset :: 148522 CurrentOffset :: 79780 Lag
:: 68742
Kafka Topic Parition :: 14 EndOffset :: 154624 CurrentOffset :: 82801 Lag
:: 71823
Kafka Topic Parition :: 11 EndOffset :: 140156 CurrentOffset :: 57608 Lag
:: 82548
Kafka Topic Parition :: 9 EndOffset :: 144958 CurrentOffset :: 60101 Lag ::
84857
Kafka Topic Parition :: 7 EndOffset :: 141056 CurrentOffset :: 66478 Lag ::
74578
Kafka Topic Parition :: 5 EndOffset :: 131643 CurrentOffset :: 57114 Lag ::
74529
Kafka Topic Parition :: 3 EndOffset :: 160416 CurrentOffset :: 64520 Lag ::
95896
Kafka Topic Parition :: 1 EndOffset :: 139207 CurrentOffset :: 59346 Lag ::
79861
Kafka Topic Parition :: 34 EndOffset :: 152845 CurrentOffset :: 80520 Lag
:: 72325
Kafka Topic Parition :: 32 EndOffset :: 143918 CurrentOffset :: 76699 Lag
:: 67219
Kafka Topic Parition :: 30 EndOffset :: 139260 CurrentOffset :: 75570 Lag
:: 63690
Kafka Topic Parition :: 27 EndOffset :: 133983 CurrentOffset :: 45030 Lag
:: 88953
Kafka Topic Parition :: 25 EndOffset :: 142668 CurrentOffset :: 47271 Lag
:: 95397
Kafka Topic Parition :: 23 EndOffset :: 153004 CurrentOffset :: 50501 Lag
:: 102503
Kafka Topic Parition :: 21 EndOffset :: 149196 CurrentOffset :: 50300 Lag
:: 98896
Kafka Topic Parition :: 19 EndOffset :: 148422 CurrentOffset :: 49069 Lag
:: 99353
Kafka Topic Parition :: 17 EndOffset :: 142753 CurrentOffset :: 75918 Lag
:: 66835
Kafka Topic Parition :: 15 EndOffset :: 140157 CurrentOffset :: 75643 Lag
:: 64514
Kafka Topic Parition :: 13 EndOffset :: 140710 CurrentOffset :: 75893 Lag
:: 64817
Kafka Topic Parition :: 12 EndOffset :: 146217 CurrentOffset :: 77475 Lag
:: 68742
Kafka Topic Parition :: 10 EndOffset :: 143537 CurrentOffset :: 58403 Lag
:: 85134
Kafka Topic Parition :: 8 EndOffset :: 139250 CurrentOffset :: 57245 Lag ::
82005
Kafka Topic Parition :: 6 EndOffset :: 144801 CurrentOffset :: 59636 Lag ::
85165
Kafka Topic Parition :: 35 EndOffset :: 143510 CurrentOffset :: 78042 Lag
:: 65468
Kafka Topic Parition :: 4 EndOffset :: 150601 CurrentOffset :: 61534 Lag ::
89067
Kafka Topic Parition :: 33 EndOffset :: 141712 CurrentOffset :: 76625 Lag
:: 65087
Kafka Topic Parition :: 2 EndOffset :: 154292 CurrentOffset :: 69812 Lag ::
84480
Kafka Topic Parition :: 31 EndOffset :: 141294 CurrentOffset :: 75832 Lag
:: 65462
Kafka Topic Parition :: 0 EndOffset :: 146847 CurrentOffset :: 60643 Lag ::
86204
Kafka Topic Parition :: 29 EndOffset :: 151172 CurrentOffset :: 49308 Lag
:: 101864

DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name connections-closed:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name connections-created:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-sent-received:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-sent:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name bytes-received:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name select-time:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name io-time:
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--1.latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node--2.latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-1.latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-0.latency
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.bytes-sent
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.bytes-received
DEBUG | 11:51:43 | metrics.Metrics (Metrics.java:436) - Removed sensor with
name node-2147483647.latency
DEBUG | 11:51:43 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - The
Kafka consumer has closed.




On Fri, Sep 15, 2017 at 4:21 PM, dev loper <sparkemr@gmail.com> wrote:

> Hi Damian,
>
> I haven't explicitly configured any configuration related to threads. All
> the instances are running with the same configuration.
> I will share the logs you asked for from both running instance and the
> other instance shortly.
>
> Thanks
> Dev
>
> On Fri, Sep 15, 2017 at 3:42 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Grepping for StreamThread would be useful, though some logs will be over
>> multiple lines.
>> We need to see which partitions have been assigned to the other
>> instances/threads, it will look something like:
>>
>> 2017-09-14 10:04:22 INFO  StreamThread:160 - stream-thread
>> [myKafka-kafkareplica101Sept08-cb392e38-1e78-4ab6-9143-eb6bc
>> 6ec8219-StreamThread-1]
>> at state PARTITIONS_REVOKED: new partitions [MYTOPIC05SEPT-24,
>> MYTOPIC05SEPT-0] assigned at the end of consumer rebalance.
>>         assigned active tasks: [0_0, 0_24]
>>         assigned standby tasks: []
>>         current suspended active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 0_5,
>> 0_6, 0_7, 0_8, 0_9, 0_10, 0_11, 0_12, 0_13, 0_14, 0_15, 0_16, 0_17,
>> 0_18, 0_19, 0_20, 0_21, 0_22, 0_23, 0_24, 0_25, 0_26, 0_27, 0_28,
>> 0_29, 0_30, 0_31, 0_32, 0_33, 0_34, 0_35]
>>         current suspended standby tasks: []
>>         previous active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 0_5, 0_6,
>> 0_7, 0_8, 0_9, 0_10, 0_11, 0_12, 0_13, 0_14, 0_15, 0_16, 0_17, 0_18,
>> 0_19, 0_20, 0_21, 0_22, 0_23, 0_24, 0_25, 0_26, 0_27, 0_28, 0_29,
>> 0_30, 0_31, 0_32, 0_33, 0_34, 0_35]
>>
>>
>> I also noticed that one of your instances looks like it is configured with
>> 12 threads, while the others have 4 - is that correct?
>>
>> On Fri, 15 Sep 2017 at 10:27 dev loper <sparkemr@gmail.com> wrote:
>>
>> > Hi Damian,
>> >
>> > I do have the logs for the other application. But its kind of huge
>> since it
>> > is continuously processing . Do you want me to grep anything specific
>> and
>> > share it with you ?
>> >
>> > Thanks
>> > Dev
>> >
>> > On Fri, Sep 15, 2017 at 2:31 PM, Damian Guy <damian.guy@gmail.com>
>> wrote:
>> >
>> > > Hi,
>> > >
>> > > Do you have the logs for the other instance?
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Fri, 15 Sep 2017 at 07:19 dev loper <sparkemr@gmail.com> wrote:
>> > >
>> > > > Dear Kafka Users,
>> > > >
>> > > > I am fairly new to Kafka Streams . I have deployed two instances of
>> > Kafka
>> > > > 0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic
>> with 36
>> > > > partitions .and speperate application writes to this topic and it
>> > > produces
>> > > > records at the rate of 10000 messages per second. I have threes
>> > instances
>> > > > of AWS  M4.xlarge instance  where my Kafka streams application is
>> > running
>> > > > which consumes these messages produced by the other application. The
>> > > > application  starts up fine working fine and its processing
>> messages on
>> > > the
>> > > > first instance,  but when I start the same application on other
>> > instances
>> > > > it is not starting even though the process is alive it is not
>> > processing
>> > > > messages.Also I could see the other instances takes a long time to
>> > start
>> > > .
>> > > >
>> > > > Apart from first instance,  other instances I could see the consumer
>> > > > getting added and removed repeatedly and I couldn't see any message
>> > > > processing at all . I have attached the detailed logs where this
>> > behavior
>> > > > is observed.
>> > > >
>> > > > Consumer is getting started with below log in these instances and
>> > getting
>> > > > stopped with below log (* detailed logs attached *)
>> > > >
>> > > > INFO  | 21:59:30 | consumer.ConsumerConfig
>> (AbstractConfig.java:223) -
>> > > > ConsumerConfig values:
>> > > >     auto.commit.interval.ms = 5000
>> > > >     auto.offset.reset = latest
>> > > >     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>> > > > l-mykafkainstancekafka5102:9092]
>> > > >     check.crcs = true
>> > > >     client.id =
>> > > >     connections.max.idle.ms = 540000
>> > > >     enable.auto.commit = false
>> > > >     exclude.internal.topics = true
>> > > >     fetch.max.bytes = 52428800
>> > > >     fetch.max.wait.ms = 500
>> > > >     fetch.min.bytes = 1
>> > > >     group.id = myKafka-kafkareplica101Sept08
>> > > >     heartbeat.interval.ms = 3000
>> > > >     interceptor.classes = null
>> > > >     internal.leave.group.on.close = true
>> > > >     isolation.level = read_uncommitted
>> > > >     key.deserializer = class mx.july.jmx.proximity.kafka.Ka
>> fkaKryoCodec
>> > > >     max.partition.fetch.bytes = 1048576
>> > > >     max.poll.interval.ms = 300000
>> > > >     max.poll.records = 500
>> > > >     metadata.max.age.ms = 300000
>> > > >     metric.reporters = []
>> > > >     metrics.num.samples = 2
>> > > >     metrics.recording.level = INFO
>> > > >     metrics.sample.window.ms = 30000
>> > > >     partition.assignment.strategy = [class
>> > > > org.apache.kafka.clients.consumer.RangeAssignor]
>> > > >     receive.buffer.bytes = 65536
>> > > >     reconnect.backoff.max.ms = 1000
>> > > >     reconnect.backoff.ms = 50
>> > > >     request.timeout.ms = 305000
>> > > >     retry.backoff.ms = 100
>> > > >     sasl.jaas.config = null
>> > > >     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> > > >     sasl.kerberos.min.time.before.relogin = 60000
>> > > >     sasl.kerberos.service.name = null
>> > > >     sasl.kerberos.ticket.renew.jitter = 0.05
>> > > >     sasl.kerberos.ticket.renew.window.factor = 0.8
>> > > >     sasl.mechanism = GSSAPI
>> > > >     security.protocol = PLAINTEXT
>> > > >     send.buffer.bytes = 131072
>> > > >     session.timeout.ms = 10000
>> > > >     ssl.cipher.suites = null
>> > > >     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> > > >     ssl.endpoint.identification.algorithm = null
>> > > >     ssl.key.password = null
>> > > >     ssl.keymanager.algorithm = SunX509
>> > > >     ssl.keystore.location = null
>> > > >     ssl.keystore.password = null
>> > > >     ssl.keystore.type = JKS
>> > > >     ssl.protocol = TLS
>> > > >     ssl.provider = null
>> > > >     ssl.secure.random.implementation = null
>> > > >     ssl.trustmanager.algorithm = PKIX
>> > > >     ssl.truststore.location = null
>> > > >     ssl.truststore.password = null
>> > > >     ssl.truststore.type = JKS
>> > > >     value.deserializer = class my.dev.MessageUpdateCodec
>> > > >
>> > > >
>> > > > DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617)
>> -
>> > The
>> > > > Kafka consumer has closed. and the whole process repeats.
>> > > >
>> > > >
>> > > >
>> > > > Below you can find my startup code for kafkastreams and the
>> parameters
>> > > > which I have configured for starting the kafkastreams application .
>> > > >
>> > > >         private static Properties settings = new Properties();
>> > > >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > > > "mykafkastreamsapplication");
>> > > >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"lates
>> t");
>> > > >         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"
>> > > 10000");
>> > > >         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"3000
>> 0");
>> > > >
>> > > > settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>> > > Integer.MAX_VALUE);
>> > > >         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
>> "10000");
>> > > >
>> > > > settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"
>> 60000");
>> > > >
>> > > >         KStreamBuilder builder = new KStreamBuilder();
>> > > >         KafkaStreams streams = new KafkaStreams(builder, settings);
>> > > >         builder.addSource(.....
>> > > >          .addProcessor  .............
>> > > >          .addProcessor  ........
>> > > >
>> > > >
>> > > > .addStateStore(...................).persistent().build(),"my
>> processor")
>> > > >          .addSink ..............
>> > > >          . addSink ..............
>> > > >           streams.start();
>> > > >
>> > > > and I am using a Simple  processor to process my logic ..
>> > > >
>> > > > public class InfoProcessor extends AbstractProcessor<Key, Update> {
>> > > > private static Logger logger = Logger.getLogger(InfoProcessor
>> .class);
>> > > > private ProcessorContext context;
>> > > > private KeyValueStore<Key, Info> infoStore;
>> > > >
>> > > > @Override
>> > > > @SuppressWarnings("unchecked")
>> > > > public void init(ProcessorContext context) {
>> > > >     this.context = context;
>> > > >     this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
>> > > >     infoStore = (KeyValueStore<Key, Info>)
>> > > > context.getStateStore("InfoStore");
>> > > > }
>> > > >
>> > > > @Override
>> > > > public void process(Key key, Update update) {
>> > > >     try {
>> > > >         if (key != null && update != null) {
>> > > >             Info info = infoStore.get(key);
>> > > >             // merge logic
>> > > >             infoStore.put(key, info);
>> > > >         }
>> > > >
>> > > >     } catch (Exception e) {
>> > > >         logger.error(e.getMessage(), e);
>> > > >     } finally {
>> > > >     }
>> > > >     context.commit();
>> > > > }
>> > > >
>> > > > @Override
>> > > > public void punctuate(long timestamp) {
>> > > >     try {
>> > > >         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>> > > >         while (iter.hasNext()) {
>> > > >             // processing logic
>> > > >
>> > > >         }
>> > > >         iter.close();
>> > > >         context.commit();
>> > > >     } catch (Exception e) {
>> > > >         logger.error(e.getMessage(), e);
>> > > >     }
>> > > > }
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
View raw message