spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Streaming app consume multiple kafka topics
Date Tue, 15 Mar 2016 14:45:19 GMT
The direct stream gives you access to the topic.  The offset range for
each partition contains the topic.  That way you can create a single
stream, and the first thing you do with it is mapPartitions with a
switch on topic.

Of course, it may make more sense to separate topics into different
jobs, but if you want it all in one, that's the most straightforward
way to do it imho.

On Tue, Mar 15, 2016 at 1:55 AM, saurabh guru <saurabh.guru@gmail.com> wrote:
> I am doing the same thing this way:
>
> // Iterate over HashSet of topics
>         Iterator<String> iterator = topicsSet.iterator();
>         JavaPairInputDStream<String, String> messages;
>         JavaDStream<String> lines;
>         String topic = "";
>         // get messages stream for each topic
>         while (iterator.hasNext()) {
>             topic = iterator.next();
>             // Create direct kafka stream with brokers and topic
>             messages = KafkaUtils.createDirectStream(jssc, String.class,
> String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
>                     new HashSet<String>(Arrays.asList(topic)));
>
>             // get lines from messages.map
>             lines = messages.map(new Function<Tuple2<String, String>,
> String>() {
>                 @Override
>                 public String call(Tuple2<String, String> tuple2) {
>                     return tuple2._2();
>                 }
>             });
>
>
>             switch (topic) {
>             case IMPR_ACC:
>                 ImprLogProc.groupAndCount(lines, esImpIndexName, IMPR_ACC,
> new ImprMarshal());
>
>                 break;
>             case EVENTS_ACC:
>                 EventLogProc.groupAndCount(lines, esEventIndexName,
> EVENTS_ACC, new EventMarshal());
>                 break;
>
>             default:
>                 logger.error("No matching Kafka topics Found");
>                 break;
>             }
>
> On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>>
>> One way would be to keep it this way:
>>
>> val stream1 = KafkaUtils.createStream(..) // for topic 1
>>
>> val stream2 = KafkaUtils.createStream(..) // for topic 2
>>
>>
>> And you will know which stream belongs to which topic.
>>
>> Another approach which you can put in your code itself would be to tag the
>> topic name along with the stream that you are creating. Like, create a
>> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
>> the stream.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2812@gmail.com>
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm just trying to create a spark streaming application that consumes
>>> more than one topics sent by kafka. Then, I want to do different further
>>> processing for data sent by each topic.
>>>
>>>> val kafkaStreams = {
>>>>       val kafkaParameter = for (consumerGroup <- consumerGroups) yield
{
>>>>         Map(
>>>>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>>>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>>>           "group.id" -> consumerGroup,
>>>>           "zookeeper.connection.timeout.ms" ->
>>>> ConsumerConfig.zookeeperConnectionTimeout,
>>>>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>>>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>>>         )
>>>>       }
>>>>       val streams = (0 to kafkaParameter.length - 1) map { p =>
>>>>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>>> DefaultDecoder](
>>>>           ssc,
>>>>           kafkaParameter(p),
>>>>           Map(topicsArr(p) -> 1),
>>>>           StorageLevel.MEMORY_ONLY_SER
>>>>         ).map(_._2)
>>>>       }
>>>>       val unifiedStream = ssc.union(streams)
>>>>       unifiedStream.repartition(1)
>>>>     }
>>>>     kafkaStreams.foreachRDD(rdd => {
>>>>       rdd.foreachPartition(partitionOfRecords => {
>>>>         partitionOfRecords.foreach ( x =>
>>>>           println(x)
>>>>         )
>>>>       })
>>>>     })
>>>
>>>
>>> So far, I'm able to get the data from several topic. However, I'm still
>>> unable to
>>> differentiate the data sent from a topic with another.
>>>
>>> Do anybody has an experience in doing this stuff?
>>>
>>> Best,
>>> Imre
>>
>>
>
>
>
> --
> Thanks,
> Saurabh
>
> :)

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message