spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From obaidul karim <obaidc...@gmail.com>
Subject Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Date Tue, 31 May 2016 06:00:09 GMT
Sorry for lots of typos (writing from mobile)

On Tuesday, 31 May 2016, obaidul karim <obaidcuet@gmail.com> wrote:

> foreachRDD does not return any value. I can be used just to send result to
> another place/context, like db,file etc.
> I could use that but seems like over head of having another hop.
> I wanted to make it simple and light.
>
> On Tuesday, 31 May 2016, nguyen duc tuan <newvalue92@gmail.com
> <javascript:_e(%7B%7D,'cvml','newvalue92@gmail.com');>> wrote:
>
>> How about using foreachRDD ? I think this is much better than your trick.
>>
>>
>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>
>>> Hi Guys,
>>>
>>> In the end, I am using below.
>>> The trick is using "native python map" along with "spark spreaming
>>> transform".
>>> May not an elegent way, however it works :).
>>>
>>> def predictScore(texts, modelRF):
>>>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>>>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>>      map( lambda (txt, features) : (txt, ([float(i) for i in features]))
>>> ).\
>>>      transform( lambda  rdd: sc.parallelize(\
>>>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>>        )\
>>>      )
>>>      # in the transform operation: x=text and y=features
>>>     # Retrun will be tuple of (score,'original text')
>>>     return predictions
>>>
>>>
>>> Hope, it will help somebody who is facing same problem.
>>> If anybody has better idea, please post it here.
>>>
>>> -Obaid
>>>
>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalue92@gmail.com>
>>> wrote:
>>>
>>>> Dstream has an method foreachRDD, so you can walk through all RDDs
>>>> inside DStream as you want.
>>>>
>>>>
>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>>
>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>>>
>>>>> Hi nguyen,
>>>>>
>>>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>>>> have suggested.
>>>>> We have to use "transform" to be able to perform normal RDD operations
>>>>> on dstreams and here I am trapped.
>>>>>
>>>>> -Obaid
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalue92@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> How about this ?
>>>>>>
>>>>>> def extract_feature(rf_model, x):
>>>>>> text = getFeatures(x).split(',')
>>>>>> fea = [float(i) for i in text]
>>>>>> prediction = rf_model.predict(fea)
>>>>>> return (prediction, x)
>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>>>>
>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidcuet@gmail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Anybody has any idea on below?
>>>>>>>
>>>>>>> -Obaid
>>>>>>>
>>>>>>>
>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidcuet@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Guys,
>>>>>>>>
>>>>>>>> This is my first mail to spark users mailing list.
>>>>>>>>
>>>>>>>> I need help on Dstream operation.
>>>>>>>>
>>>>>>>> In fact, I am using a MLlib randomforest model to predict
using
>>>>>>>> spark streaming. In the end, I want to combine the feature
Dstream &
>>>>>>>> prediction Dstream together for further downstream processing.
>>>>>>>>
>>>>>>>> I am predicting using below piece of code:
>>>>>>>>
>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda
x :
>>>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>>>
>>>>>>>> Here texts is dstream having single line of text as records
>>>>>>>> getFeatures generates a comma separated features extracted
from
>>>>>>>> each record
>>>>>>>>
>>>>>>>>
>>>>>>>> I want the output as below tuple:
>>>>>>>> ("predicted value", "original text")
>>>>>>>>
>>>>>>>> How can I achieve that ?
>>>>>>>> or
>>>>>>>> at least can I perform .zip like normal RDD operation on
two
>>>>>>>> Dstreams, like below:
>>>>>>>> output = texts.zip(predictions)
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>> -Obaid
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message