flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...
Date Fri, 01 Dec 2017 08:51:53 GMT
Github user tzulitai commented on a diff in the pull request:

    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
    @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic,
     			partitions != null && partitions.length > 0,
     			"Partitions of the target topic is empty.");
    -		return partitions[parallelInstanceId % partitions.length];
    +		if (topicToFixedPartition.containsKey(targetTopic)) {
    --- End diff --
    The full solution would be to allow FlinkKafkaPartitioner to be stateful and participate
in checkpointing.
    That might mean passing in the runtime context in FlinkKafkaPartitioner.open(), but that
would be touching the user API.
    We could also opt for a workaround to only register state for the FlinkFixedPartitioner
internally, and still restrict custom partitioners to be stateless.
    However, I think it would make more sense to avoid workarounds at this stage.


View raw message