spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From obaidul karim <obaidc...@gmail.com>
Subject Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Date Tue, 31 May 2016 08:25:02 GMT
Hi nguyen,

Thanks again.
Yes, faltMap may do the trick as well.
I may try it out.

I will let you know the result when done.




On Tue, May 31, 2016 at 3:58 PM, nguyen duc tuan <newvalue92@gmail.com
<javascript:_e(%7B%7D,'cvml','newvalue92@gmail.com');>> wrote:

> 1. RandomForest 'predict' method supports both RDD or Vector as input (
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel)
> . So, in this case, function extract_feature should return
> tuple.(prediction, rawtext). If each input text can creates a list of
> vectors, try using "flatMap" instead of "map"
> 2, 3: From spark documents: "*Discretized Stream* or *DStream* is the
> basic abstraction provided by Spark Streaming. It represents a continuous
> stream of data, either the input data stream received from source, or the
> processed data stream generated by transforming the input stream.
> Internally, it is represented by a continuous sequence of RDDs, which is
> Spark’s abstraction of an immutable, distributed dataset. Each RDD in a
> DStream contains data from a certain interval, as shown in the following
> figure."(
> https://spark.apache.org/docs/0.9.1/streaming-programming-guide.html)
> ​​
> So, in order to handle a stream, you should handle each rdd in that
> stream. This means with everything things want to do with your new data,
> put them in 'process_rdd' function. There's nothing return in output of
> 'foreachRdd' function, of course.
>
> 2016-05-31 14:39 GMT+07:00 obaidul karim <obaidcuet@gmail.com
> <javascript:_e(%7B%7D,'cvml','obaidcuet@gmail.com');>>:
>
>> Hi nguyen,
>>
>> Thanks a lot for your time and really appreciate good suggestions.
>>
>> Please find my concerns in line below:
>>
>> def extract_feature(rf_model, x):
>> text = getFeatures(x).split(',')
>> fea = [float(i) for i in text]
>> prediction = rf_model.predict(fea)
>> return (prediction, x) <<< this will return two separate list as tuple,
>> but i want one to one mapping (pred, text) not (predlist, textlist)
>>
>> def process_rdd(rdd):
>>      fea = rdd.map(lambda x: extract_feature(rf_model, x))
>>      //do something as you want (saving,...) <<< I want to avoid saving
>> to external system(definitely not in global variable). As I said, it could
>> be an overhead considering streaming.
>>
>> stream.foreachRDD(process_rdd) <<< As you can see here, no variable to
>> store the output from foreachRDD. My target is to get (pred, text) pair and
>> then use
>>
>> Whatever it is, the output from "extract_feature" is not what I want.
>> I will be more than happy if you please correct my mistakes here.
>>
>>
>> -Obaid
>>
>> On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalue92@gmail.com
>> <javascript:_e(%7B%7D,'cvml','newvalue92@gmail.com');>> wrote:
>>
>>> I'm not sure what do you mean by saying "does not return any value".
>>> How do you use this method?
>>> I will use this method as following :
>>> def extract_feature(rf_model, x):
>>> text = getFeatures(x).split(',')
>>> fea = [float(i) for i in text]
>>> prediction = rf_model.predict(fea)
>>> return (prediction, x)
>>>
>>> def process_rdd(rdd):
>>>      fea = rdd.map(lambda x: extract_feature(rf_model, x))
>>>      //do something as you want (saving,...)
>>>
>>> stream.foreachRDD(process_rdd)
>>>
>>> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidcuet@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','obaidcuet@gmail.com');>>:
>>>
>>>> foreachRDD does not return any value. I can be used just to send result
>>>> to another place/context, like db,file etc.
>>>> I could use that but seems like over head of having another hop.
>>>> I wanted to make it simple and light.
>>>>
>>>>
>>>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalue92@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','newvalue92@gmail.com');>> wrote:
>>>>
>>>>> How about using foreachRDD ? I think this is much better than your
>>>>> trick.
>>>>>
>>>>>
>>>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> In the end, I am using below.
>>>>>> The trick is using "native python map" along with "spark spreaming
>>>>>> transform".
>>>>>> May not an elegent way, however it works :).
>>>>>>
>>>>>> def predictScore(texts, modelRF):
>>>>>>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt))
>>>>>> ).\
>>>>>>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>>>>>      map( lambda (txt, features) : (txt, ([float(i) for i in
>>>>>> features])) ).\
>>>>>>      transform( lambda  rdd: sc.parallelize(\
>>>>>>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>>>>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>>>>>        )\
>>>>>>      )
>>>>>>      # in the transform operation: x=text and y=features
>>>>>>     # Retrun will be tuple of (score,'original text')
>>>>>>     return predictions
>>>>>>
>>>>>>
>>>>>> Hope, it will help somebody who is facing same problem.
>>>>>> If anybody has better idea, please post it here.
>>>>>>
>>>>>> -Obaid
>>>>>>
>>>>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <
>>>>>> newvalue92@gmail.com> wrote:
>>>>>>
>>>>>>> Dstream has an method foreachRDD, so you can walk through all
RDDs
>>>>>>> inside DStream as you want.
>>>>>>>
>>>>>>>
>>>>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>>>>>
>>>>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>>>>>>
>>>>>>>> Hi nguyen,
>>>>>>>>
>>>>>>>> If I am not mistaken, we cannot call  "predict" on "dstream"
as
>>>>>>>> you have suggested.
>>>>>>>> We have to use "transform" to be able to perform normal RDD
>>>>>>>> operations on dstreams and here I am trapped.
>>>>>>>>
>>>>>>>> -Obaid
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <
>>>>>>>> newvalue92@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> How about this ?
>>>>>>>>>
>>>>>>>>> def extract_feature(rf_model, x):
>>>>>>>>> text = getFeatures(x).split(',')
>>>>>>>>> fea = [float(i) for i in text]
>>>>>>>>> prediction = rf_model.predict(fea)
>>>>>>>>> return (prediction, x)
>>>>>>>>> output = texts.map(lambda x: extract_feature(rf_model,
x))
>>>>>>>>>
>>>>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Anybody has any idea on below?
>>>>>>>>>>
>>>>>>>>>> -Obaid
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidcuet@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Guys,
>>>>>>>>>>>
>>>>>>>>>>> This is my first mail to spark users mailing
list.
>>>>>>>>>>>
>>>>>>>>>>> I need help on Dstream operation.
>>>>>>>>>>>
>>>>>>>>>>> In fact, I am using a MLlib randomforest model
to predict using
>>>>>>>>>>> spark streaming. In the end, I want to combine
the feature Dstream &
>>>>>>>>>>> prediction Dstream together for further downstream
processing.
>>>>>>>>>>>
>>>>>>>>>>> I am predicting using below piece of code:
>>>>>>>>>>>
>>>>>>>>>>> predictions = texts.map( lambda x : getFeatures(x)
).map(lambda
>>>>>>>>>>> x : x.split(',')).map( lambda parts : [float(i)
for i in parts]
>>>>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>>>>>>
>>>>>>>>>>> Here texts is dstream having single line of text
as records
>>>>>>>>>>> getFeatures generates a comma separated features
extracted from
>>>>>>>>>>> each record
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I want the output as below tuple:
>>>>>>>>>>> ("predicted value", "original text")
>>>>>>>>>>>
>>>>>>>>>>> How can I achieve that ?
>>>>>>>>>>> or
>>>>>>>>>>> at least can I perform .zip like normal RDD operation
on two
>>>>>>>>>>> Dstreams, like below:
>>>>>>>>>>> output = texts.zip(predictions)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance.
>>>>>>>>>>>
>>>>>>>>>>> -Obaid
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Mime
View raw message