kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <g...@confluent.io>
Subject Re: Producer property to set to enable async data transfer in kafka 8.2.2
Date Tue, 24 Nov 2015 23:19:22 GMT
The new producer is async by default.

You can see few examples of how to use it here:
https://github.com/gwenshap/kafka-examples/tree/master/SimpleCounter/src/main/java/com/shapira/examples/producer/simplecounter

On Tue, Nov 24, 2015 at 10:40 AM, Amit Karyekar <akaryekar@fanatics.com>
wrote:

> Hi folks,
>
> We are working with Kafka 8.2.2 and want to use producer.type as async for
> sending messages to broker.
>
> In Kakfka 8.2.2, some new producer properties have been introduced.
> However, there is no new name for the property producer.type mentioned in
> the documentation.
>
> We’ve the following configuration for Kafka producer:
>
>     bootstrap.servers: localhost:9092
>
>     serializer.class:
> org.apache.kafka.common.serialization.StringSerializer
>
>     key.serializer: org.apache.kafka.common.serialization.StringSerializer
>
>     value.serializer:
> org.apache.kafka.common.serialization.StringSerializer
>
>     acks: 1
>
>     producer.type: async
>
> On sending messages, we are seeing following message in logs:
>
> 2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka producer
> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version
> 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
> producer.type = null was supplied but isn't a known config.
> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
> serializer.class = null was supplied but isn't a known config.
> 2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started
> 2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O thread.
> 2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update
> for topic ArgosHeartbeat.
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1
> for sending metadata request in the next iteration
> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
> node -1 at localhost:9092.
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node
> -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request
> ClientRequest(expectResponse=true, payload=null,
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-9},
> body={topics=[ArgosHeartbeat]})) to node -1
> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version
> 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions =
> [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas =
> [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0, leader =
> 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition
> = 1, leader = 0, replicas = [0,], isr = [0,]])
> 2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record
> ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109,
> value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190}
> with callback null to topic ArgosHeartbeat partition 0
> 2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384
> byte message buffer for topic ArgosHeartbeat partition 0
> 2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since
> topic ArgosHeartbeat partition 0 is either full or getting a new batch
> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
> node 0 at 192.168.99.1:9092.
> 2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka producer.
> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node
> 0
> 2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka
> producer I/O thread, sending remaining records.
> 2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send:
> [Node(0, 192.168.99.1, 9092)]
> 2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests:
> [ClientRequest(expectResponse=true,
> payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0,
> recordCount=1)},
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,client_id=producer-9},
> body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=169 cap=16384]}]}]}))]
> 2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from node
> 0 with correlation id 1
> 2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to
> topic-partition ArgosHeartbeat-0 with base offset offset 41 and error: null.
> 2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O
> thread has completed.
> 2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has
> closed.
> 2015-11-23 18:18:50 INFO  ProducerConfig:113 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> acks = 1
> batch.size = 16384
> reconnect.backoff.ms = 10
> bootstrap.servers = [localhost:9092]
> receive.buffer.bytes = 32768
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> retries = 0
> max.request.size = 1048576
> block.on.buffer.full = true
> value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> metrics.sample.window.ms = 30000
> send.buffer.bytes = 131072
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> linger.ms = 0
> client.id =
>
>
> In regards to above log,  we wanted to know how to set producer.type
> parameter to async in Kafka 8.2.2.
> Also, it is unable to recognize serializer.class parameter which was a
> parameter in old Kafka producer config.
>
> Regards,
> Amit
> Information contained in this e-mail message is confidential. This e-mail
> message is intended only for the personal use of the recipient(s) named
> above. If you are not an intended recipient, do not read, distribute or
> reproduce this transmission (including any attachments). If you have
> received this email in error, please immediately notify the sender by email
> reply and delete the original message.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message