kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Karyekar <akarye...@fanatics.com>
Subject Re: Producer property to set to enable async data transfer in kafka 8.2.2
Date Tue, 24 Nov 2015 23:30:14 GMT
Cool. Thanks Gwen for your response.

Regards,
Amit

On 11/24/15, 3:19 PM, "Gwen Shapira" <gwen@confluent.io> wrote:

>The new producer is async by default.
>
>You can see few examples of how to use it here:
>https://github.com/gwenshap/kafka-examples/tree/master/SimpleCounter/src/m
>ain/java/com/shapira/examples/producer/simplecounter
>
>On Tue, Nov 24, 2015 at 10:40 AM, Amit Karyekar <akaryekar@fanatics.com>
>wrote:
>
>> Hi folks,
>>
>> We are working with Kafka 8.2.2 and want to use producer.type as async
>>for
>> sending messages to broker.
>>
>> In Kakfka 8.2.2, some new producer properties have been introduced.
>> However, there is no new name for the property producer.type mentioned
>>in
>> the documentation.
>>
>> We┬╣ve the following configuration for Kafka producer:
>>
>>     bootstrap.servers: localhost:9092
>>
>>     serializer.class:
>> org.apache.kafka.common.serialization.StringSerializer
>>
>>     key.serializer:
>>org.apache.kafka.common.serialization.StringSerializer
>>
>>     value.serializer:
>> org.apache.kafka.common.serialization.StringSerializer
>>
>>     acks: 1
>>
>>     producer.type: async
>>
>> On sending messages, we are seeing following message in logs:
>>
>> 2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka
>>producer
>> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata
>>version
>> 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
>> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
>> producer.type = null was supplied but isn't a known config.
>> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
>> serializer.class = null was supplied but isn't a known config.
>> 2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started
>> 2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O
>>thread.
>> 2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update
>> for topic ArgosHeartbeat.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1
>> for sending metadata request in the next iteration
>> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
>> node -1 at localhost:9092.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to
>>node
>> -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request
>> ClientRequest(expectResponse=true, payload=null,
>>
>>request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,clie
>>nt_id=producer-9},
>> body={topics=[ArgosHeartbeat]})) to node -1
>> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata
>>version
>> 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions =
>> [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas =
>> [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0,
>>leader =
>> 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat,
>>partition
>> = 1, leader = 0, replicas = [0,], isr = [0,]])
>> 2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record
>> ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109,
>>
>>value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","
>>RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190}
>> with callback null to topic ArgosHeartbeat partition 0
>> 2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384
>> byte message buffer for topic ArgosHeartbeat partition 0
>> 2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since
>> topic ArgosHeartbeat partition 0 is either full or getting a new batch
>> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
>> node 0 at 192.168.99.1:9092.
>> 2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka
>>producer.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to
>>node
>> 0
>> 2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka
>> producer I/O thread, sending remaining records.
>> 2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send:
>> [Node(0, 192.168.99.1, 9092)]
>> 2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests:
>> [ClientRequest(expectResponse=true,
>> payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0,
>> recordCount=1)},
>>
>>request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,clie
>>nt_id=producer-9},
>>
>>body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{parti
>>tion=0,record_set=java.nio.HeapByteBuffer[pos=0
>> lim=169 cap=16384]}]}]}))]
>> 2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from
>>node
>> 0 with correlation id 1
>> 2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to
>> topic-partition ArgosHeartbeat-0 with base offset offset 41 and error:
>>null.
>> 2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O
>> thread has completed.
>> 2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has
>> closed.
>> 2015-11-23 18:18:50 INFO  ProducerConfig:113 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> acks = 1
>> batch.size = 16384
>> reconnect.backoff.ms = 10
>> bootstrap.servers = [localhost:9092]
>> receive.buffer.bytes = 32768
>> retry.backoff.ms = 100
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> retries = 0
>> max.request.size = 1048576
>> block.on.buffer.full = true
>> value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> metrics.sample.window.ms = 30000
>> send.buffer.bytes = 131072
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> linger.ms = 0
>> client.id =
>>
>>
>> In regards to above log,  we wanted to know how to set producer.type
>> parameter to async in Kafka 8.2.2.
>> Also, it is unable to recognize serializer.class parameter which was a
>> parameter in old Kafka producer config.
>>
>> Regards,
>> Amit
>> Information contained in this e-mail message is confidential. This
>>e-mail
>> message is intended only for the personal use of the recipient(s) named
>> above. If you are not an intended recipient, do not read, distribute or
>> reproduce this transmission (including any attachments). If you have
>> received this email in error, please immediately notify the sender by
>>email
>> reply and delete the original message.
>>

Information contained in this e-mail message is confidential. This e-mail message is intended
only for the personal use of the recipient(s) named above. If you are not an intended recipient,
do not read, distribute or reproduce this transmission (including any attachments). If you
have received this email in error, please immediately notify the sender by email reply and
delete the original message.

Mime
View raw message