spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Kafka Direct Stream
Date Sat, 03 Oct 2015 16:25:34 GMT
Hi,

collect(partialFunction) is equivalent to filter(x=>
partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
equivalent to your expression. I favor collect for its more compact form
but that's a personal preference. Use what you feel reads best.

Regarding performance, there will be some overhead of submitting many a
task for every filtered RDD that gets materialized to Cassandra. That's the
reason I proposed the ticket linked above. Have a look whether that would
improve your particular usecase and vote for it if so :-)

-kr, Gerard.

On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharmansit@gmail.com>
wrote:

> Thanks Gerard....the code snippet you shared worked.. but can you please
> explain/point me the usage of *collect* here. How it is
> different(performance/readability) from *filter.*
>
>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>
>
> I am doing something like this.Please tell if I can improve the *Processing
> time* of this particular code:
>
> kafkaStringStream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct().collect()
>   if (topics.length > 0) {
>     val rdd_value = rdd.take(10).mkString("\n.....\n")
>     Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all feeds\n$rdd_value"))
>
>     topics.foreach { topic =>
>       //rdd.filter(x=> x._1 == topic).map(_._2)
>       val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>       CassandraHelper.saveDataToCassandra(topic, filteredRdd)
>     }
>     updateOffsetsinZk(rdd)
>   }
>
> }
>
> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.maas@gmail.com>
> wrote:
>
>> Something like this?
>>
>> I'm making the assumption that your topic name equals your keyspace for
>> this filtering example.
>>
>> dstream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct.collect
>>   topics.foreach{topic =>
>>     val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>>     filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>> collect with rdd.collect() that brings data to the driver
>>   }
>> }
>>
>>
>> I'm wondering: would something like this (
>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>> purposes?
>>
>> -kr, Gerard.
>>
>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharmansit@gmail.com>
>> wrote:
>>
>>> Hi Adrian,
>>>
>>> Can you please give an example of how to achieve this:
>>>
>>>> *I would also look at filtering by topic and saving as different
>>>> Dstreams in your code*
>>>
>>> I have managed to get DStream[(String, String)] which is (
>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>> DStream[(String,Iterable[String])].
>>> But I want a DStream instead of Iterable in order to apply
>>> saveToCassandra for storing it.
>>>
>>> Please help in how to transform iterable to DStream or any other
>>> workaround for achieving same.
>>>
>>>
>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atanase@adobe.com> wrote:
>>>
>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>> operators for the processing.
>>>>
>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>> topics) and the processing is *really* different, I would also look at
>>>> filtering by topic and saving as different Dstreams in your code.
>>>>
>>>> Either way you need to start with Cody’s tip in order to extract the
>>>> topic name.
>>>>
>>>> -adrian
>>>>
>>>> From: Cody Koeninger
>>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>>> To: Udit Mehta
>>>> Cc: user
>>>> Subject: Re: Kafka Direct Stream
>>>>
>>>> You can get the topic for a given partition from the offset range.  You
>>>> can either filter using that; or just have a single rdd and match on topic
>>>> when doing mapPartitions or foreachPartition (which I think is a better
>>>> idea)
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>
>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <umehta@groupon.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using spark direct stream to consume from multiple topics in
>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the
data
>>>>> for each topic since I need to process data differently depending on
the
>>>>> topic.
>>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>>> each having 1 topic.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks in advance,
>>>>> Udit
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Mime
View raw message