kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apurva Mehta <apu...@confluent.io>
Subject Re: implementing kafka transactions : performance issue
Date Mon, 18 Sep 2017 17:59:21 GMT
Hi Hugues.

How 'big' are your transactions? In particular, how many produce records
are in a single transaction? Can you share your actual producer code?

Also, did you try the `kafka-producer-perf-test.sh` tool with a
transactional id and see what the latency is for transactions with that
tool?

Thanks,
Apurva

On Mon, Sep 18, 2017 at 9:02 AM, <Hugues.Deslandes@hardis.fr> wrote:

> Hi,
> I am testing an app with transactions on the producer side of kafka
> (0.11.0.1) .   I  defined the producer config (see below) and added the
> necessary lines in the app (#initTransaction, #begintransaction and
> #commitTransaction) around the existing #send
> The problem I am facing is that each transcation takes up to 150ms to be
> treated which doesn't make sense, even for a laptop !
> I have tested some batch size config witout any success (messages are
> around 100 bytes)
> I certainly made a mistake in the setup but can't figure out which one, or
> how to investigate. I checked by removing the transaction lines and the
> app works fine (in my case less than 200 ms for 100 "send"s  to kafka)
>
> My config is : 3 VMs on my laptop for the kafka cluster.  My main topic
> has 3 partitions, with 3 replicas and the min.insync .replicas is set to 2
>
>
> the producer is defined by (remaing configs by default)
>                 final Properties props = new Properties();
>                 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrap_Servers);
>                 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringSerializer.class);
>                 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
>                 props.put(AbstractKafkaAvroSerDeConfig.
> SCHEMA_REGISTRY_URL_CONFIG,schema_Registry_URL);
>
>                 props.put(ProducerConfig.ACKS_CONFIG, "all");
>                 props.put(ProducerConfig.RETRIES_CONFIG , 5);
>
>                 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>                 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
> transactionnalId);
>                 props.put(ProducerConfig.
> MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
>
>                 confluentProducer = new KafkaProducer<>(props);
>
> Any idea what could be wrong ? have I forgotten something ?
> Thanks
> Hugues DESLANDES
>
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message