flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Smirnov <alexander.smirn...@gmail.com>
Subject Can't send kafka message with timestamp
Date Thu, 26 Apr 2018 12:59:41 GMT
Hi,


I'm creating kafka producer with timestamps enabled following
instructions at
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer


        Optional<FlinkKafkaPartitioner<T>> customPartitioner = Optional.empty();

        FlinkKafkaProducer011<T> result = new
FlinkKafkaProducer011<>(defaultTopic, serializationSchema, properties,
customPartitioner);

        result.setWriteTimestampToKafka(true);



but getting an exception:


java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp
should always be non-negative or null.
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:642)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)




Is there anything else I need to configure to embed timestamp
information into resulting kafka message?


Thank you,

Alex

Mime
View raw message