spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 莫涛 <mo...@sensetime.com>
Subject 答复: 答复: how to generate a column using mapParition and then add it back to the df?
Date Tue, 09 Aug 2016 02:14:28 GMT
Hi guha,

Thanks a lot!
This is perfectly what I want and I'll try to implement it.


MoTao
________________________________
发件人: ayan guha <guha.ayan@gmail.com>
发送时间: 2016年8月8日 18:05:37
收件人: 莫涛
抄送: ndjido@gmail.com; user@spark.apache.org
主题: Re: 答复: how to generate a column using mapParition and then add it back to the
df?

Hi

I think you should modify initModel() function to getOrCreateModel() and create the model
as singleton object. You may want to refer this link<http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation>

On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 <motao@sensetime.com<mailto:motao@sensetime.com>>
wrote:
Hi Ndjido,

Thanks for your reply.

Yes, it is good idea if the model can be broadcast.

I'm working with a built library (on Linux, say classifier.so and classifier.h) and it requires
the model file is in the local file system.
As I don't have access to the library code, I write JNI to wrap the classifier.
The model file can be sent to each executor efficiently by addFile and getFile.
But initModel() is still expensive as it actually loads a local file into C++ heap memory,
which is not serializable.

That's the reason I can not broadcast the model and I have to avoid load model as possible
as I can.

Best

________________________________
发件人: ndjido@gmail.com<mailto:ndjido@gmail.com> <ndjido@gmail.com<mailto:ndjido@gmail.com>>
发送时间: 2016年8月8日 17:16:27
收件人: 莫涛
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: how to generate a column using mapParition and then add it back to the df?


Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao <motao@sensetime.com<mailto:motao@sensetime.com>>
wrote:
>
> Hi all,
>
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
>
> In my case, the column to be appended is created by processing each row,
> like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf {
>  v: Double => {
>    val model = initModel()
>    model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
>
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
>
> But this is wrong as a column of df2 *CANNOT* be appended to df.
>
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
>
> I wonder how to deal with this problem with as few overhead as possible?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache.org>
>



--
Best Regards,
Ayan Guha
Mime
View raw message