spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaonary Rabarisoa <jaon...@gmail.com>
Subject Re: Some questions after playing a little with the new ml.Pipeline.
Date Tue, 03 Mar 2015 12:36:51 GMT
Here is my current implementation with current master version of spark




*class DeepCNNFeature extends Transformer with HasInputCol with
HasOutputCol ... {   override def transformSchema(...) { ... }*
*    override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {*

*                  transformSchema(dataSet.schema, paramMap, logging = true)*



*                  val map = this.paramMap ++ paramMap
 val deepCNNFeature = udf((v: Vector)=> {*

*                              val cnnModel = new CaffeModel *

*                              cnnModel.transform(v)*




*                  } : Vector )
dataSet.withColumn(map(outputCol),
deepCNNFeature(col(map(inputCol))))*


*     }*
*}*

where CaffeModel is a java api to Caffe C++ model.

The problem here is that for every row it will create a new instance of
CaffeModel which is inefficient since creating a new model
means loading a large model file. And it will transform only a single row
at a time. Or a Caffe network can process a batch of rows efficiently. In
other words, is it possible to create an UDF that can operatats on a
partition in order to minimize the creation of a CaffeModel and
to take advantage of the Caffe network batch processing ?



On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <joseph@databricks.com>
wrote:

> I see, thanks for clarifying!
>
> I'd recommend following existing implementations in spark.ml
> transformers.  You'll need to define a UDF which operates on a single Row
> to compute the value for the new column.  You can then use the DataFrame
> DSL to create the new column; the DSL provides a nice syntax for what would
> otherwise be a SQL statement like "select ... from ...".  I'm recommending
> looking at the existing implementation (rather than stating it here)
> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
> improved and makes it easier to create a new column.
>
> Joseph
>
> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaonary@gmail.com>
> wrote:
>
>> class DeepCNNFeature extends Transformer ... {
>>
>>     override def transform(data: DataFrame, paramMap: ParamMap):
>> DataFrame = {
>>
>>
>>                  // How can I do a map partition on the underlying RDD
>> and then add the column ?
>>
>>      }
>> }
>>
>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>> wrote:
>>
>>> Hi Joseph,
>>>
>>> Thank your for the tips. I understand what should I do when my data are
>>> represented as a RDD. The thing that I can't figure out is how to do the
>>> same thing when the data is view as a DataFrame and I need to add the
>>> result of my pretrained model as a new column in the DataFrame. Preciselly,
>>> I want to implement the following transformer :
>>>
>>> class DeepCNNFeature extends Transformer ... {
>>>
>>> }
>>>
>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <joseph@databricks.com>
>>> wrote:
>>>
>>>> Hi Jao,
>>>>
>>>> You can use external tools and libraries if they can be called from
>>>> your Spark program or script (with appropriate conversion of data types,
>>>> etc.).  The best way to apply a pre-trained model to a dataset would be to
>>>> call the model from within a closure, e.g.:
>>>>
>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>
>>>> If your data is distributed in an RDD (myRDD), then the above call will
>>>> distribute the computation of prediction using the pre-trained model.  It
>>>> will require that all of your Spark workers be able to run the
>>>> preTrainedModel; that may mean installing Caffe and dependencies on all
>>>> nodes in the compute cluster.
>>>>
>>>> For the second question, I would modify the above call as follows:
>>>>
>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>   val myModel = // instantiate neural network on this partition
>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>> }
>>>>
>>>> I hope this helps!
>>>> Joseph
>>>>
>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <jaonary@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear all,
>>>>>
>>>>>
>>>>> We mainly do large scale computer vision task (image classification,
>>>>> retrieval, ...). The pipeline is really great stuff for that. We're trying
>>>>> to reproduce the tutorial given on that topic during the latest spark
>>>>> summit (
>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
)
>>>>> using the master version of spark pipeline and dataframe. The tutorial
>>>>> shows different examples of feature extraction stages before running
>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>> reproduce with this new API, we still have some questions :
>>>>>
>>>>>    - Can one use external tools (e.g via pipe) as a pipeline stage ?
>>>>>    An example of use case is to extract feature learned with convolutional
>>>>>    neural network. In our case, this corresponds to a pre-trained neural
>>>>>    network with Caffe library (http://caffe.berkeleyvision.org/) .
>>>>>
>>>>>
>>>>>    - The second question is about the performance of the pipeline.
>>>>>    Library such as Caffe processes the data in batch and instancing one
Caffe
>>>>>    network can be time consuming when this network is very deep. So,
we can
>>>>>    gain performance if we minimize the number of Caffe network creation
and
>>>>>    give data in batch to the network. In the pipeline, this corresponds
to run
>>>>>    transformers that work on a partition basis and give the whole partition
to
>>>>>    a single caffe network. How can we create such a transformer ?
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Jao
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message