kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiamei Xie <Jiamei....@arm.com>
Subject Producer MESSAGE_TOO_LARGE warnning when compression is zstd
Date Tue, 03 Mar 2020 08:40:34 GMT
Hi,
Issue:
The MESSAGE_TOO_LARGE  warning occurred when I run producer-perf-test.sh and the broker is
arm server.  Below it’s the command I used:
bin/kafka-topics.sh --create --bootstrap-server arm-server:9092 --replication-factor 1 --partitions
6 --topic test
bin/kafka-producer-perf-test.sh --topic test --num-records 50000000 --throughput -1 --record-size
1000 --producer-props bootstrap.servers= arm-server:9092 acks=1 buffer.memory=67108864 batch.size=65536
compression.type=zstd

[2020-03-03 06:11:02,485] WARN [Producer clientId=producer-1] Got error produce response in
correlation id 236 on topic-partition test-2, splitting and retrying (2147483647 attempts
left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender)
514288 records sent, 7931.4 records/sec (7.56 MB/sec), 32267.0 ms avg latency, 60002.0 ms
max latency.
[2020-03-03 06:16:15,294] ERROR Uncaught exception in thread 'kafka-producer-network-thread
| producer-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced objects
After MESSAGE_TOO_LARGE occurred, the memory used in client increase dramatically, and the
performance got worse.  Below is the dstat data in client.
dstat -cdmn
--total-cpu-usage-- -dsk/total- ------memory-usage----- -net/total-
usr sys idl wai stl| read  writ| used  free  buff  cach| recv  send
  0   0  99   0   0|1762k   11k|1180M 29.9G 13.3M  162M|   0     0
  5   3  92   0   0|   0     0 |1180M 29.9G 13.3M  162M|1966B 3924B
11   2  87   0   0|   0    16k|1238M 29.9G 13.3M  162M|5066B  379k
  9   3  87   0   0|   0     0 |1277M 29.8G 13.3M  162M|3580B  281k
  5   2  93   0   0|   0     0 |1278M 29.8G 13.3M  162M|3712B  253k
16   2  82   0   0|   0     0 |1334M 29.8G 13.3M  162M|  12k 1568k
21   4  76   0   0|   0    20k|2047M 29.1G 13.3M  162M|  11k   51k
  9   4  87   0   0|   0     0 |3199M 27.9G 13.3M  162M|  26k   97k
  7   5  88   0   0|   0     0 |4341M 26.8G 13.3M  162M|  25k   94k
  5   5  90   0   0|   0     0 |5543M 25.7G 13.3M  162M|  25k  100k
  5   5  90   0   0|   0     0 |6903M 24.3G 13.3M  162M|  29k  120k
12   6  82   0   0|   0     0 |8320M 22.9G 13.3M  162M|  29k  116k
  6   5  89   0   0|   0    28k|9568M 21.7G 13.3M  162M|  30k  124k
  7   4  89   0   0|   0     0 |10.2G 20.8G 13.3M  162M|  35k  389k
11   5  83   0   0|   0     0 |10.3G 20.7G 13.3M  162M|  50k  768k
  6   2  92   0   0|   0     0 |10.3G 20.7G 13.3M  162M|  20k  530k
  8   3  89   0   0|   0     0 |10.4G 20.7G 13.3M  162M|9183B  471k
  8   2  90   0   0|   0    96k|10.4G 20.7G 13.3M  162M|5552B  156k
  6   2  92   0   0|   0     0 |10.4G 20.7G 13.3M  162M|9672B  486k
13   3  84   0   0|   0    16k|10.5G 20.6G 13.3M  162M|5771B  310k
10   3  87   0   0|   0     0 |10.5G 20.6G 13.3M  162M|7868B  295k
14   2  85   0   0|   0     0 |10.5G 20.5G 13.3M  162M|  16k 1524k
20   3  76   0   0|   0    32k|11.2G 19.9G 13.3M  162M|  13k   54k
12   5  84   0   0|   0     0 |12.4G 18.7G 13.3M  162M|  27k  113k
  7   5  88   0   0|   0     0 |13.7G 17.4G 13.3M  162M|  29k  115k
11   5  84   0   0|   0     0 |14.9G 16.1G 13.3M  162M|  30k  124k
  6   5  89   0   0|   0     0 |16.3G 14.8G 13.3M  162M|  28k  118k
  6   4  89   1   0|   0    60k|17.5G 13.6G 13.3M  162M|  27k  107k
  7   6  87   0   0|   0     0 |18.7G 12.4G 13.3M  162M|  29k  164k
  5   1  93   0   0|   0     0 |18.7G 12.4G 13.3M  162M|  45k  713k
10   3  87   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  24k  548k
  6   2  92   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  422k
  7   2  90   0   0|   0    44k|18.8G 12.3G 13.3M  162M|  13k  470k
16   2  82   0   0|   0     0 |18.8G 12.3G 13.3M  162M|9324B  323k
  6   2  91   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  11k  386k
  8   2  90   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  403k
  8   3  89   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  370k
  7   2  90   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  13k  435k
  7   2  91   0   0|   0    20k|18.8G 12.2G 13.3M  162M|  17k  397k
  8   2  90   0   0|   0     0 |18.8G 12.2G 13.3M  162M|  16k  546k
10   2  88   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  13k  436k
  5   2  93   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  14k  492k
11   3  87   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  13k  465k
  8   2  90   0   0|   0     0 |18.9G 12.2G 13.3M  162M|9836B  384k
10   1  88   0   0|   0    20k|18.9G 12.2G 13.3M  162M|9884B  392k
10   1  89   0   0|   0     0 |18.9G 12.1G 13.3M  162M|  69k  771k
  4   2  94   0   0|   0     0 |18.9G 12.1G 13.3M  162M|  71k  668k
  4   1  95   0   0|   0     0 |19.0G 12.1G 13.3M  162M|  68k  634k
  2   1  98   0   0|   0    16k|19.0G 12.1G 13.3M  162M|  72k  725k
  6   2  92   0   0|   0     0 |19.0G 12.1G 13.3M  162M|  73k  690k
  3   1  95   0   0| 232k    0 |19.0G 12.0G 13.5M  162M|  73k  723k
  5   2  94   0   0|   0     0 |19.0G 12.0G 13.5M  162M|  79k  764k
  2   1  96   0   0|4096B   56k|19.1G 12.0G 13.5M  162M|  81k  800k
  5   2  94   0   0|   0    72k|19.1G 12.0G 13.5M  162M|  69k  663k
  1   1  98   0   0|   0     0 |19.1G 12.0G 13.5M  162M|  48k  476k


This issue doesn’t  when compression type is none, snappy,lz4 or gzip.

Environment:
Server: one arm server
Client: x86_64(E5-2650 v3)
Kafka: Git clone https://github.com/apache/kafka.git and build it by “./gradlew jar”
OS: Ubuntu 18.04.4 LTS
Java: openjdk version "1.8.0_242"
Scala: The default version, not installed manually.

Set the below environment variables on both server and client
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80 -XX:+PreserveFramePointer"

server.properties
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600


>From source code aspect, what I have done is below:
1.
In kafka/core/src/main/scala/kafka/log/Log.scala , it will check if the batchsize is valid,
and if not, it will throw RecordTooLargeException. And I add log check the batchsize. It’s
bigger than config.maxMessageSize(1048588)


          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes}
bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of
${config.maxMessageSize}.")
              }
            }
2.  In clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java, It will
check if receiveSize is valid in readFrom method. And I add some log to print the  receive
size,  requestedBufferSize and read.

public long readFrom(ScatteringByteChannel channel) throws IOException {
    int read = 0;
    if (size.hasRemaining()) {
        int bytesRead = channel.read(size);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
        if (!size.hasRemaining()) {
            size.rewind();
            int receiveSize = size.getInt();
            log.warn("####receiveSize is {} , maxSize is {} ",receiveSize, maxSize);
            if (receiveSize < 0)
                throw new InvalidReceiveException("Invalid receive (size = " + receiveSize
+ ")");
            if (maxSize != UNLIMITED && receiveSize > maxSize)
                throw new InvalidReceiveException("Invalid receive (size = " + receiveSize
+ " larger than " + maxSize + ")");
            requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
            if (receiveSize == 0) {
                buffer = EMPTY_BUFFER;
            }
        }
    }
    if (buffer == null && requestedBufferSize != -1) { //we know the size we want
but havent been able to allocate it yet
        buffer = memoryPool.tryAllocate(requestedBufferSize);
        if (buffer == null)
            log.trace("Broker low on memory - could not allocate buffer of size {} for source
{}", requestedBufferSize, source);
    }
    if (buffer != null) {
        int bytesRead = channel.read(buffer);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
    }
    log.warn("####requestedBufferSize is {}, read is {} " ,requestedBufferSize, read );
    return read;}

where maxSize is socket.request.max.bytes and its value is 104857600. There are receiveSizes
which is bigger than config.maxMessageSize. And I made a receiveSize contrast between arm(server
is arm, and client is x86) and x86(server and client are x86). The difference betteen arm
and x86 is big.  arm:2158872, X86:87158.

The read is much lower than requestedBufferSize.
The biggest read is  1084552
The biggest requestBufferSize is 3927692.
Why the requestBufferSize is much biger than read ?

3. In clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java, I added log
to print buffer size.
private static ByteBuffer sizeBuffer(int size) {
    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
    ByteBuffer test = ByteBuffer.allocate(4);
    sizeBuffer.putInt(size);
    log.warn("#### size ={}”, size);
    sizeBuffer.rewind();
    return sizeBuffer;
}

And the biggest size is the same as  the biggest requestBufferSize, which is also 3927692.
 This really confuses me. ARM and X86 use the same client to produce data to broker. Why the
size(arm ) in NetworkSend.java ( I suppose this is the data sent to broker )  is biger than
size(x86)?


4. Some of the producer process flow is
RecordAccumulator.drain()---RecordAccumulator.drainBatchesForOneNode----ProduceBatch.close----(recordsBuilder.close
, which get the actualCompressionRatio ), (CompressionRatioEstimator.updateEstimation)

I added some logs to print estimatedCompressionRatio ,ready,  and uncompressedRecordsSizeInBytes
In clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
   private int estimatedBytesWritten() {
        if (compressionType == CompressionType.NONE) {
            return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
        } else {
            // estimate the written bytes to the underlying byte buffer based on uncompressed
written bytes
       log.warn("####uncompressedRecordsSizeInBytes is {} ,estimatedCompressionRatio is {}
",uncompressedRecordsSizeInBytes, estimatedCompressionRatio);
           return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio
* COMPRESSION_RATE_ESTIMATION_FACTOR);
        }
    }

In clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node>
nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();
        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize,
now);
            batches.put(node.id(), ready);
//maxSize is 1048576
            log.warn("#maxSize is {} , ready is {} ", maxSize, ready);
        }
        return batches;
    }

The lowest estimatedCompressionRatio is -8.419007E-4 which is negative number. And I checked
it on x86. There were also estimatedCompressionRation like this.
>From ready ,we could get the record count for some topicPartition, the biggest is 394277
The biggest uncompressedRecordsSizeInBytes  is 711893825.

What might cause uncompressedRecordsSizeInBytes so big?
Why is estimatedCompressionRation a negative number? Is that normal?

5. In clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
 add log to check if this if statement was enter.
                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty())
{
                    log.warn(“#### size is {} , first.estimatedSizeInBytes() is {} ", size,
first.estimatedSizeInBytes() );
                    // there is a rare case that a single batch size is larger than the request
size due to
                   // compression; in this case we will still eventually send this batch in
a single request
                    break;
                }
There is no log like this, which means this it didn’t enter this if statement .

Is the way producer compress data dependent on broker ? Like compression ratio.  Why does
the client used memory vary dramatically when  after MESSAGE_TOO_LARGE occur? Could anyone
give me some suggestions on how to locate the root cause to this issue?  Thanks.

Best Wishes,
Jiamei

IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may
also be privileged. If you are not the intended recipient, please notify the sender immediately
and do not disclose the contents to any other person, use it for any purpose, or store or
copy the information in any medium. Thank you.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message