spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imre Nagi <imre.nagi2...@gmail.com>
Subject Re: Streaming app consume multiple kafka topics
Date Tue, 15 Mar 2016 06:55:07 GMT
Actually, I have tried your suggestion but it seems not working. Let me try
it once again.

Thanks for your help
Best,
Imre

On Tue, Mar 15, 2016 at 1:52 PM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> One way would be to keep it this way:
>
> val stream1 = KafkaUtils.createStream(..) // for topic 1
>
> val stream2 = KafkaUtils.createStream(..) // for topic 2
>
>
> And you will know which stream belongs to which topic.
>
> Another approach which you can put in your code itself would be to tag the
> topic name along with the stream that you are creating. Like, create a
> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
> the stream.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2812@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm just trying to create a spark streaming application that consumes
>> more than one topics sent by kafka. Then, I want to do different further
>> processing for data sent by each topic.
>>
>> val kafkaStreams = {
>>>       val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>>         Map(
>>>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>>           "group.id" -> consumerGroup,
>>>           "zookeeper.connection.timeout.ms" ->
>>> ConsumerConfig.zookeeperConnectionTimeout,
>>>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>>         )
>>>       }
>>>       val streams = (0 to kafkaParameter.length - 1) map { p =>
>>>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>> DefaultDecoder](
>>>           ssc,
>>>           kafkaParameter(p),
>>>           Map(topicsArr(p) -> 1),
>>>           StorageLevel.MEMORY_ONLY_SER
>>>         ).map(_._2)
>>>       }
>>>       val unifiedStream = ssc.union(streams)
>>>       unifiedStream.repartition(1)
>>>     }
>>>     kafkaStreams.foreachRDD(rdd => {
>>>       rdd.foreachPartition(partitionOfRecords => {
>>>         partitionOfRecords.foreach ( x =>
>>>           println(x)
>>>         )
>>>       })
>>>     })
>>
>>
>> So far, I'm able to get the data from several topic. However, I'm still
>> unable to
>> differentiate the data sent from a topic with another.
>>
>> Do anybody has an experience in doing this stuff?
>>
>> Best,
>> Imre
>>
>
>

Mime
View raw message