spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Streaming app consume multiple kafka topics
Date Tue, 15 Mar 2016 06:52:08 GMT
One way would be to keep it this way:

val stream1 = KafkaUtils.createStream(..) // for topic 1

val stream2 = KafkaUtils.createStream(..) // for topic 2


And you will know which stream belongs to which topic.

Another approach which you can put in your code itself would be to tag the
topic name along with the stream that you are creating. Like, create a
tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
the stream.


Thanks
Best Regards

On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2812@gmail.com> wrote:

> Hi,
>
> I'm just trying to create a spark streaming application that consumes more
> than one topics sent by kafka. Then, I want to do different further
> processing for data sent by each topic.
>
> val kafkaStreams = {
>>       val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>         Map(
>>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>           "group.id" -> consumerGroup,
>>           "zookeeper.connection.timeout.ms" ->
>> ConsumerConfig.zookeeperConnectionTimeout,
>>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>         )
>>       }
>>       val streams = (0 to kafkaParameter.length - 1) map { p =>
>>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>> DefaultDecoder](
>>           ssc,
>>           kafkaParameter(p),
>>           Map(topicsArr(p) -> 1),
>>           StorageLevel.MEMORY_ONLY_SER
>>         ).map(_._2)
>>       }
>>       val unifiedStream = ssc.union(streams)
>>       unifiedStream.repartition(1)
>>     }
>>     kafkaStreams.foreachRDD(rdd => {
>>       rdd.foreachPartition(partitionOfRecords => {
>>         partitionOfRecords.foreach ( x =>
>>           println(x)
>>         )
>>       })
>>     })
>
>
> So far, I'm able to get the data from several topic. However, I'm still
> unable to
> differentiate the data sent from a topic with another.
>
> Do anybody has an experience in doing this stuff?
>
> Best,
> Imre
>

Mime
View raw message