kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manu Jacob <Manu.Ja...@sas.com>
Subject RE: Kafka connect FieldPartitioner with scheduled rotation
Date Tue, 05 Feb 2019 14:56:40 GMT
Hi Pere,

I am using 3.3.0. By the way, I could resolve this issue by setting the “timezone” property
on the connector. It is a bit strange as I thought that value is used only by Time based partitioner(which
could have defaulted to UTC) and not a requirement for scheduled rotation. Not sure if it
is a defect or a genuine requirement.

Thanks,
-Manu

From: Pere Urbón Bayes <pere.urbon@gmail.com>
Sent: Monday, February 04, 2019 11:41 PM
To: Manu Jacob <Manu.Jacob@sas.com>
Cc: users@kafka.apache.org
Subject: Re: Kafka connect FieldPartitioner with scheduled rotation


EXTERNAL
Hi Manu,
  if we take master of the connector, I forgot to ask you witch version are you using, your
error looks to me coming from:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

This line is basically:

for (TopicPartition tp : assignment) {
topicPartitionWriters.get(tp).write();
}

it could as well be coming from: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/3.2.x/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

this is more reasonable according to your exception

this shows as well the same sort of problem.

it raises a null pointer exception, this makes me thing there is some problem with the partition
writers

My guess with this in mind is that the error is not coming from the partitioner, but from
something with the writers.

I hope this helps,

-- Pere

Missatge de Manu Jacob <Manu.Jacob@sas.com<mailto:Manu.Jacob@sas.com>> del dia
dg., 3 de febr. 2019 a les 19:09:
Hi Pere,

Following is my configuration. In this test, I want to flush 1000 records and/or 5 minutes.


{"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.bucket.name<http://s3.bucket.name>":"test-bucket","rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>":"300000","partition.field.name<http://partition.field.name>":"externalTenantId",
"timestamp.extractor":"Wallclock"}

I get the following error. I am using version 3.3.0

2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0} offsets threw an unexpected
exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and unrecoverable exception
(org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerTask:149)

Thanks,
-Manu

-----Original Message-----
From: Pere Urbón Bayes <pere.urbon@gmail.com<mailto:pere.urbon@gmail.com>>
Sent: Sunday, February 03, 2019 12:50 PM
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Kafka connect FieldPartitioner with scheduled rotation

EXTERNAL

Hi Manu,
  can you share your s3 connector config as well the exception you are getting? with only
this info, I do need more details to understand your issue. Keep in mind the option you are
using "rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>"
from the docs says:

> This configuration is useful when you have to commit your data based
> on
current server time, for example at the beginning of every hour. The default value -1 means
that this feature is disabled.

Cheers

Missatge de Manu Jacob <Manu.Jacob@sas.com<mailto:Manu.Jacob@sas.com>> del dia
dg., 3 de febr. 2019 a les 16:47:

> Hi,
>
> I want to use s3 connect by portioning with FieldPartitioner and
> partition.field.name<http://partition.field.name> set to a non timestamp based
field. I want to
> commit and flush based on  both size and time. I am getting an
> exception when I use the option "rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>".
Is it
> possible to rotate it with FieldPartitioner? I tried to set the
> timestamp.extractor (with record and wallclock) but it looks like it
> is honored only for time based partitioner.
>
> -Manu
>
>
>

--
Pere Urbon-Bayes
Software Architect
http://www.purbon.com
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/


--
Pere Urbon-Bayes
Software Architect
http://www.purbon.com<http://www.purbon.com/>
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message