spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaonary Rabarisoa <jaon...@gmail.com>
Subject Re: Some questions after playing a little with the new ml.Pipeline.
Date Tue, 31 Mar 2015 07:50:59 GMT
Following your suggestion, I end up with the following implementation :







*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {  val schema = transformSchema(dataSet.schema, paramMap,
logging = true)  val map = this.paramMap ++ paramMap*













*val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
  Caffe.set_mode(Caffe.CPU)    val net =
CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
SparkFiles.get(weight))    val inputBlobs: FloatBlobVector =
net.input_blobs()    val N: Int = 1    val K: Int =
inputBlobs.get(0).channels()    val H: Int =
inputBlobs.get(0).height()    val W: Int = inputBlobs.get(0).width()
 inputBlobs.get(0).Reshape(N, K, H, W)    val dataBlob = new
FloatPointer(N*K*W*H)*
    val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

    val feat = rows.map { case Row(a: Iterable[Float])=>
      dataBlob.put(a.toArray, 0, a.size)
      caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
      val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























*      val resultDim = resultBlobs.get(0).channels()
logInfo(s"Output dimension $resultDim")      val resultBlobData =
resultBlobs.get(0).cpu_data()      val output = new
Array[Float](resultDim)      resultBlobData.get(output)
Vectors.dense(output.map(_.toDouble))    }    //net.deallocate()
feat  }  val newRowData = dataSet.rdd.zip(features).map { case (old,
feat)=>    val oldSeq = old.toSeq      Row.fromSeq(oldSeq :+ feat)  }
dataSet.sqlContext.createDataFrame(newRowData, schema)}*


The idea is to mapPartitions of the underlying RDD of the DataFrame and
create a new DataFrame by zipping the results. It seems to work but when I
try to save the RDD I got the following error :

org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row


On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
shivaram@eecs.berkeley.edu> wrote:

> One workaround could be to convert a DataFrame into a RDD inside the
> transform function and then use mapPartitions/broadcast to work with the
> JNI calls and then convert back to RDD.
>
> Thanks
> Shivaram
>
> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaonary@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I'm still struggling to make a pre-trained caffe model transformer for
>> dataframe works. The main problem is that creating a caffe model inside the
>> UDF is very slow and consumes memories.
>>
>> Some of you suggest to broadcast the model. The problem with broadcasting
>> is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
>> not serializable.
>>
>> Another possible approach is to use a UDF that can handle a whole
>> partitions instead of just a row in order to minimize the caffe model
>> instantiation.
>>
>> Is there any ideas to solve one of these two issues ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <joseph@databricks.com>
>> wrote:
>>
>>> I see.  I think your best bet is to create the cnnModel on the master
>>> and then serialize it to send to the workers.  If it's big (1M or so), then
>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>> now.
>>>
>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>>> wrote:
>>>
>>>> Here is my current implementation with current master version of spark
>>>>
>>>>
>>>>
>>>>
>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>>> *    override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>> DataFrame = {*
>>>>
>>>> *                  transformSchema(dataSet.schema, paramMap, logging = true)*
>>>>
>>>>
>>>>
>>>> *                  val map = this.paramMap ++ paramMap                  val
deepCNNFeature = udf((v: Vector)=> {*
>>>>
>>>> *                              val cnnModel = new CaffeModel *
>>>>
>>>> *                              cnnModel.transform(v)*
>>>>
>>>>
>>>>
>>>>
>>>> *                  } : Vector )                  dataSet.withColumn(map(outputCol),
deepCNNFeature(col(map(inputCol))))*
>>>>
>>>>
>>>> *     }*
>>>> *}*
>>>>
>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>
>>>> The problem here is that for every row it will create a new instance of
>>>> CaffeModel which is inefficient since creating a new model
>>>> means loading a large model file. And it will transform only a single
>>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>>> In other words, is it possible to create an UDF that can operatats on a
>>>> partition in order to minimize the creation of a CaffeModel and
>>>> to take advantage of the Caffe network batch processing ?
>>>>
>>>>
>>>>
>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <joseph@databricks.com>
>>>> wrote:
>>>>
>>>>> I see, thanks for clarifying!
>>>>>
>>>>> I'd recommend following existing implementations in spark.ml
>>>>> transformers.  You'll need to define a UDF which operates on a single
Row
>>>>> to compute the value for the new column.  You can then use the DataFrame
>>>>> DSL to create the new column; the DSL provides a nice syntax for what
would
>>>>> otherwise be a SQL statement like "select ... from ...".  I'm recommending
>>>>> looking at the existing implementation (rather than stating it here)
>>>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>>>>> improved and makes it easier to create a new column.
>>>>>
>>>>> Joseph
>>>>>
>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>
>>>>>>     override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>> DataFrame = {
>>>>>>
>>>>>>
>>>>>>                  // How can I do a map partition on the underlying
>>>>>> RDD and then add the column ?
>>>>>>
>>>>>>      }
>>>>>> }
>>>>>>
>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaonary@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Joseph,
>>>>>>>
>>>>>>> Thank your for the tips. I understand what should I do when my
data
>>>>>>> are represented as a RDD. The thing that I can't figure out is
how to do
>>>>>>> the same thing when the data is view as a DataFrame and I need
to add the
>>>>>>> result of my pretrained model as a new column in the DataFrame.
Preciselly,
>>>>>>> I want to implement the following transformer :
>>>>>>>
>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>> joseph@databricks.com> wrote:
>>>>>>>
>>>>>>>> Hi Jao,
>>>>>>>>
>>>>>>>> You can use external tools and libraries if they can be called
from
>>>>>>>> your Spark program or script (with appropriate conversion
of data types,
>>>>>>>> etc.).  The best way to apply a pre-trained model to a dataset
would be to
>>>>>>>> call the model from within a closure, e.g.:
>>>>>>>>
>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum)
}
>>>>>>>>
>>>>>>>> If your data is distributed in an RDD (myRDD), then the above
call
>>>>>>>> will distribute the computation of prediction using the pre-trained
model.
>>>>>>>> It will require that all of your Spark workers be able to
run the
>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies
on all
>>>>>>>> nodes in the compute cluster.
>>>>>>>>
>>>>>>>> For the second question, I would modify the above call as
follows:
>>>>>>>>
>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum)
}
>>>>>>>> }
>>>>>>>>
>>>>>>>> I hope this helps!
>>>>>>>> Joseph
>>>>>>>>
>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>> jaonary@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Dear all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>> classification, retrieval, ...). The pipeline is really
great stuff for
>>>>>>>>> that. We're trying to reproduce the tutorial given on
that topic during the
>>>>>>>>> latest spark summit (
>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
)
>>>>>>>>> using the master version of spark pipeline and dataframe.
The tutorial
>>>>>>>>> shows different examples of feature extraction stages
before running
>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward
to
>>>>>>>>> reproduce with this new API, we still have some questions
:
>>>>>>>>>
>>>>>>>>>    - Can one use external tools (e.g via pipe) as a pipeline
>>>>>>>>>    stage ? An example of use case is to extract feature
learned with
>>>>>>>>>    convolutional neural network. In our case, this corresponds
to a
>>>>>>>>>    pre-trained neural network with Caffe library (
>>>>>>>>>    http://caffe.berkeleyvision.org/) .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - The second question is about the performance of
the
>>>>>>>>>    pipeline. Library such as Caffe processes the data
in batch and instancing
>>>>>>>>>    one Caffe network can be time consuming when this
network is very deep. So,
>>>>>>>>>    we can gain performance if we minimize the number
of Caffe network creation
>>>>>>>>>    and give data in batch to the network. In the pipeline,
this corresponds to
>>>>>>>>>    run transformers that work on a partition basis and
give the whole
>>>>>>>>>    partition to a single caffe network. How can we create
such a transformer ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Jao
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message