kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhishek Verma <abhishekverma3...@gmail.com>
Subject Reg. Kafka transactional producer and consumer
Date Thu, 02 Nov 2017 04:25:42 GMT

Hi All,

I am trying to make a hello world example for Transactional Producer and trying to consume.
I am doing all this in plain java.

I can produce but consumer is not consuming message.

I searched over other places and I found some people have same problem.

Right now, I am using single broker. I tried same with 3 brokers also and it was not working at that time also.

I don’t know what I am missing and where… :p in Consumer I am missing something or in producer.

I have attached Producer and Consumer codes and console logs with my broker logs

Thanks,
Abhishek


My Broker logs after producing messages
<LOGS broker>
[2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and producer epoch 0 on partition __transaction_state-2 (kafka.coordinator.transaction.TransactionCoordinator)
</LOGS broker>

My producer code is
<CODE producer>
import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonSerializer;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.*;

public class SampleProducer {

    public static String topic = "topic-4";

    public static void main(String[] args) {

        Properties configProperties = new Properties();

        //configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "some-client-id");
        configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" + new Random().nextDouble() + new Random().nextInt());
        configProperties.put("acks", "all");
        configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        configProperties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        configProperties.put("value.serializer", JsonSerializer.class);
        configProperties.put("bootstrap.servers", "192.168.41.132:9090<http://192.168.41.132:9090>");


        KafkaProducer<Integer, DataObject>producer = new KafkaProducer<>(configProperties);

        System.out.println("Init Transaction");
        producer.initTransactions();
        try {

            System.out.println("transaction initialised going to begin transaction");
            producer.beginTransaction();
            System.out.println("Transaction started");

            ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, "Hello, World!"));

            RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + metadata.offset());

            metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + metadata.offset());

            producer.commitTransaction();
            System.out.println("Transaction Committed");

        }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            System.out.println("Connection closed but commit failed. We can't recover");
            producer.close();
        }catch(KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            System.out.println("Abort Transaction");
            producer.abortTransaction();
        }catch (Exception x){}
        producer.close();
        System.out.println("Closed");
    }
}


</CODE proucer>

These are my producer console logs

<LOGS producer>
0    [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
buffer.memory = 33554432
client.id<http://client.id> =
compression.type = none
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms<http://linger.ms> = 0
max.block.ms<http://max.block.ms> = 60000
max.in<http://max.in>.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 30000
retries = 2147483647
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms<http://transaction.timeout.ms> = 60000
transactional.id<http://transactional.id> = TXN_ID:0.5031925219291776-156417066
value.serializer = class com.example.transaction.producer.utils.serde.JsonSerializer

261  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Instantiated a transactional producer.
265  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Overriding the default max.in<http://max.in>.flight.requests.per.connection to 1 since idempontence is enabled.
274  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bufferpool-wait-time
281  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name buffer-exhausted-records
284  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = [])
297  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name produce-throttle-time
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:
566  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:
567  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:
573  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name batch-size
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name compression-rate
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name queue-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name request-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-per-request
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-retries
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name errors
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-size-max
579  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name batch-split-rate
582  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Starting Kafka producer I/O thread.
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.0
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : cb8625948210849f
586  [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - Kafka producer started
Init Transaction
588  [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state UNINITIALIZED to INITIALIZING
588  [main] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to -1 with epoch -1
594  [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
598  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION)
598  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>.
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1.  Fetching API versions.
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node -1.
897  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
898  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
901  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>.
901  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-sent
902  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-received
902  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.latency
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 1001.  Fetching API versions.
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 1001.
905  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
1009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9491 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to 4001 with epoch 0
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state INITIALIZING to READY
transaction initialised going to begin transaction
9491 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state READY to IN_TRANSACTION
Transaction started
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
9523 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Begin adding new partition topic-4-0 to transaction
9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0])
9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0]) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Successfully added partitions [topic-4-0] to transaction
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.records-per-batch
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.bytes
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.compression-rate
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.record-retries
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.record-errors
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-4-0 to 1
The offset of the record we just sent is: 5
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 1 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-4-0 to 2
The offset of the record we just sent is: 6
9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT)
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state COMMITTING_TRANSACTION to READY
Transaction Committed
9554 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-closed:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-created:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name select-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name io-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Shutdown of Kafka producer I/O thread has completed.
9554 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - The Kafka producer has closed.
Closed

</LOGS producer console>



My consumer side code.

<CODE consumer>
package com.example.transaction.producer.consumer;

import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties<http://java.util.Properties>;


public class Consumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    public static String topic = "topic-4";

    public static void main(String[] args) {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.41.132:9090<http://192.168.41.132:9090>");
        configProperties.put("group.id<http://group.id>","some-different-group-3");
        configProperties.put("enable.auto<http://enable.auto>.commit", "true");
        configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");


        KafkaConsumer<Integer, DataObject> consumer = new KafkaConsumer(configProperties,new IntegerDeserializer(),new JsonDeserializer(DataObject.class));

        consumer.subscribe(Arrays.asList(topic));

        LOGGER.info<http://LOGGER.info>("*************** Starting Consumer *************");

        while (true) {
            ConsumerRecords<Integer, DataObject> records = consumer.poll(1000);
            records.forEach(record -> {
                System.out.printf("offset = %d\n", record.offset());
                System.out.println("Key = " + record.key().toString() + "\nMessage = " + record.value().toString());
            });
        }

    }
}

</CODE consumer


My Consumer Console logs.

<LOG consumer>
0    [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
check.crcs = true
client.id<http://client.id> =
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.auto<http://enable.auto>.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = some-different-group-3
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = null
internal.leave.group<http://internal.leave.group>.on.close = true
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 300000
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 305000
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms<http://session.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.example.transaction.producer.utils.serde.JsonDeserializer

2    [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Starting the Kafka consumer
139  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = [])
164  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-throttle-time
444  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:
448  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:
478  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name heartbeat-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name join-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name sync-latency
482  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name commit-latency
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-fetched
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-fetched
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-latency
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-lag
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.0
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : cb8625948210849f
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Kafka consumer created
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Subscribed to topic(s): topic-4
498  [main] INFO  com.example.transaction.producer.consumer.Consumer  - *************** Starting Consumer *************
498  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group some-different-group-3 to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
509  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>.
523  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
525  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
526  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
527  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1.  Fetching API versions.
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node -1.
661  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
662  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1
672  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
675  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1509541833582, latencyMs=170, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null))) for group some-different-group-3
676  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) for group some-different-group-3.
676  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 2147482646 at 192.168.41.132:9090<http://192.168.41.132:9090>.
679  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending synchronous auto-commit of offsets {} for group some-different-group-3
679  [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking previously assigned partitions [] for group some-different-group-3
679  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining group some-different-group-3
682  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending JoinGroup ((type: JoinGroupRequest, groupId=some-different-group-3, sessionTimeout=10000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@133e16fd)) to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
683  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.bytes-sent
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.bytes-received
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.latency
684  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147482646
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 2147482646.  Fetching API versions.
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 2147482646.
692  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 2147482646: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
695  [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Heartbeat thread for group some-different-group-3 started
717  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful JoinGroup response for group some-different-group-3: org.apache.kafka.common.requests.JoinGroupResponse@140e5a13
718  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing assignment for group some-different-group-3 using strategy range with subscriptions {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Subscription(topics=[topic-4])}
719  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished assignment for group some-different-group-3: {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Assignment(partitions=[topic-4-0])}
721  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending leader SyncGroup for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null): (type=SyncGroupRequest, groupId=some-different-group-3, generationId=6, memberId=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806, groupAssignment=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806)
787  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully joined group some-different-group-3 with generation 6
787  [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting newly assigned partitions [topic-4-0] for group some-different-group-3
803  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group some-different-group-3 fetching committed offsets for partitions: [topic-4-0]
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Resetting offset for partition topic-4-0 to the committed offset 0
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>.
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-sent
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-received
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.latency
819  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 1001.  Fetching API versions.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 1001.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.bytes-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.records-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic-4-0.records-lag
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2906 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3792 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
3792 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful Heartbeat response for group some-different-group-3
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5797 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending asynchronous auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3
5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group some-different-group-3 committed offset 0 for partition topic-4-0
5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Completed auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6800 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
6800 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful Heartbeat response for group some-different-group-3
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
</LOGS consumer>




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message