spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Srinivas V <srini....@gmail.com>
Subject Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters
Date Tue, 09 Jun 2020 18:07:29 GMT
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas <gerard.maas@gmail.com> wrote:

> Hi Srinivas,
>
> Reading from different brokers is possible but you need to connect to each
> Kafka cluster separately.
> Trying to mix connections to two different Kafka clusters in one
> subscriber is not supported. (I'm sure that it would give all kind of weird
> errors)
> The  "kafka.bootstrap.servers" option is there to indicate the potential
> many brokers of the *same* Kafka cluster.
>
> The way to address this is following the suggestion of German to create a
> subscriptions for each Kafka cluster you are talking to.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
>   .option("subscribe", "topic1, topic2")
>  .load()
>
> val df_cluster2 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
>   .option("subscribe", "topic3, topicn, topicn+1,")
>  .load()
>
> After acquiring the DataFrame, you can union them and treat all the data
> with a single process.
>
> val unifiedData = df_cluster1.union(df_cluster2)
> // apply further transformations on `unifiedData`
>
> kr, Gerard.
>
>
> :
>
>
>
> On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <srini.vyr@gmail.com> wrote:
>
>> Thanks for the quick reply. This may work but I have like 5 topics to
>> listen to right now, I am trying to keep all topics in an array in a
>> properties file and trying to read all at once. This way it is dynamic and
>> you have one code block like below and you may add or delete topics from
>> the config file without changing code. If someone confirms that it does not
>> work, I would have to do something like you have provided.
>>
>> val df_cluster1 = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
>>
>> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>>
>>

Mime
View raw message