kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Le-Van Huyen <lvhu...@gmail.com>
Subject Kafka S3 Connector - exactly once failure
Date Fri, 09 Aug 2019 07:30:55 GMT
Hi Everyone, 

First of all, I want to send thanks to Kafka team for the beautiful tool.
I just started with Kafka recently, and I am trying to deploy a Kafka cluster on AWS EC2 using
Confluent's docker images, which consists of: (i) 3 Zookeeper hosts, (ii) 1 Kafka broker auto-scaling-group
(ASG) of 3 servers, and (iii) 1 client ASG on which I run kafka-rest-proxy and kafka-connect.
The default topic-replicas setting is 3.
The connector configuration is quoted at the end of this post.
The whole cluster has been running fine.

Today I needed to upgrade the brokers and the client ASGs. We used AWS' ASG rolling update
method, which turns off one old server, adds a new one with the new configuration, waits 20
minutes, then the 2nd one,...

The problem came after the last old server was terminated and the last new server was added,
my S3 connector started reading from the beginning of the topics and write the output to S3.
So I ended up having two copies of my Kafka records in S3.

Is this the expected behaviour? As per this page "https://docs.confluent.io/current/connect/kafka-connect-s3/index.html"
it seems not.
If that behaviour is expected, then what's the error in my upgrade procedure? (I didn't have
a chance to check the replicas of my topics during the upgrade)

Thanks and regards,

	"name": "s3-bytearray",
	"config": {
		"connector.class": "io.confluent.connect.s3.S3SinkConnector",
		"tasks.max": "3",
		"errors.log.enable": "true",
		"errors.log.include.messages": "true",
		"topics": "<my_topics>",
		"topics.dir": "<my_s3_prefix>",
		"rotate.interval.ms": "600000",
		"schema.compatibility": "NONE",
		"flush.size": "5000",
		"s3.bucket.name": "<my bucket>",
		"s3.part.size": "5242880",
		"s3.region": "ap-southeast-2",
		"s3.compression.type": "gzip",
		"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"key.converter.schemas.enable": "false",
		"value.converter.schemas.enable": "false",
		"storage.class": "io.confluent.connect.s3.storage.S3Storage",
		"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
		"format.bytearray.extension": ".json",
		"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
		"partition.duration.ms": "86400000",
		"errors.tolerance": "all",
		"errors.deadletterqueue.topic.name": "connect_S3_bytearray_error",
		"errors.deadletterqueue.topic.replication.factor": 2,
		"locale": "AU",
		"timezone": "UTC",
		"path.format": "'dt'=YYYY'-'MM'-'dd",
		"timestamp.extractor": "Record"

View raw message