flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...
Date Fri, 01 Dec 2017 09:22:51 GMT
Github user tzulitai commented on a diff in the pull request:

    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
    @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic,
     			partitions != null && partitions.length > 0,
     			"Partitions of the target topic is empty.");
    -		return partitions[parallelInstanceId % partitions.length];
    +		if (topicToFixedPartition.containsKey(targetTopic)) {
    --- End diff --
    On second thought, passing in the runtime context wouldn’t work. To let user custom
partitioners  be stateful, we’ll essentially need to make FlinkKafkaPartitioner a CheckpointedFunction,
and let the FlinkKafkaProducerBase invoke the checkpoint methods.
    We should be able to avoid breaking the user API by having empty base implementations
for the checkpoint methods on the FlinkKafkaPartitioner.


View raw message