flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
Date Mon, 26 Sep 2016 15:08:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523327#comment-15523327
] 

ASF GitHub Bot commented on FLINK-4035:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2369#discussion_r80496915
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
---
    @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer<byte[],
byte[]> consumer
     		consumer.assign(topicPartitions);
     	}
     
    +	@Override
    +	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition,
long offset, ConsumerRecord consumerRecord) throws Exception {
    +		// pass timestamp
    +		super.emitRecord(record, partition, offset, consumerRecord.timestamp());
    +	}
    +
     	/**
     	 * Emit record Kafka-timestamp aware.
     	 */
     	@Override
    -	protected <R> void emitRecord(T record, KafkaTopicPartitionState<TopicPartition>
partitionState, long offset, R kafkaRecord) throws Exception {
    -		long timestamp = ((ConsumerRecord) kafkaRecord).timestamp();
    +	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
     		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
    --- End diff --
    
    Yes, I'll do that.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
>                 Key: FLINK-4035
>                 URL: https://issues.apache.org/jira/browse/FLINK-4035
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>            Assignee: Robert Metzger
>            Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  Published messages
now include timestamps and compressed messages now include relative offsets.  As it is now,
brokers must decompress publisher compressed messages, assign offset to them, and recompress
them, which is wasteful and makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message