flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Kafka and parallelism
Date Wed, 07 Feb 2018 09:27:16 GMT
Hi Christophe,

Yes, you can achieve writing to different topics per-message using the `KeyedSerializationSchema`
provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override the default
target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, so I have
also been thinking about moving that elsewhere (maybe as part of the partitioner).

If you want to route a record to some topic depending on which topic it came from on the consumer
side, you’ll have to wrap the source topic information within the records so that it is
available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, which exposes
information about which topic and partition each record came from.


On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjolif@gmail.com) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics depending on from where
the original message came from (i.e. if this comes from origin-topic-1 I will serialize the
result in destination-topic-1, if from topic-2 to topic-2 etc...). However the KafkaProducer
is working on a fixed topic. You can provide a partitioning function (FlinkKafkaPartitioner)
but not a "topic" function that would allow to decide to witch topic sending the message a
bit like a BucketingSink would decide the bucket or ElasticsearchSinkFunction allows you to
choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are talking about
"defaultTopicId" and some about "topicId" just like if in some case there was some ability
to override the topic. Is there there a feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of
Kafka partitions (across all subscribed streams, including newly created streams that match
a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer
subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support
this feature. You can take a look at [1] on how to do that.

Hope this helps!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com) wrote:


If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism
to the number of partions or will it adjust automatically accordingly? In other word if I
don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the
number of actual topic (and so partitions) behind the pattern can change so it is not possible
to know ahead ot time how many partitions I will get.


View raw message