spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaonary <>
Subject Re: Some questions after playing a little with the new ml.Pipeline.
Date Wed, 04 Mar 2015 07:35:07 GMT
If think it will be interesting to have the equivalents of mappartitions with dataframe. There
are many use cases where data are processed in batch. Another example is a simple linear classifier
Ax=b where A is the matrix of feature vectors, x the model and b the output. Here again the
product Ax can be  done efficiently for a batch of data. 

I will test for the broadcast hack. But I'm wondering whether it is possible to append or
zip a RDD as a new column of a Dataframe. The idea is to do mappartitions on the the RDD of
the input column and then and the result as output column ?


> Le 3 mars 2015 à 22:04, Joseph Bradley <> a écrit :
> I see.  I think your best bet is to create the cnnModel on the master and then serialize
it to send to the workers.  If it's big (1M or so), then you can broadcast it and use the
broadcast variable in the UDF.  There is not a great way to do something equivalent to mapPartitions
with UDFs right now.
>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <> wrote:
>> Here is my current implementation with current master version of spark 
>> class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... {
>>    override def transformSchema(...) { ... }
>>     override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {
>>                   transformSchema(dataSet.schema, paramMap, logging = true)
>>                   val map = this.paramMap ++ paramMap
>>                   val deepCNNFeature = udf((v: Vector)=> {
>>                               val cnnModel = new CaffeModel 
>>                               cnnModel.transform(v)
>>                   } : Vector )
>>                  dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))
>>      }
>> }
>> where CaffeModel is a java api to Caffe C++ model.
>> The problem here is that for every row it will create a new instance of CaffeModel
which is inefficient since creating a new model
>> means loading a large model file. And it will transform only a single row at a time.
Or a Caffe network can process a batch of rows efficiently. In other words, is it possible
to create an UDF that can operatats on a partition in order to minimize the creation of a
CaffeModel and 
>> to take advantage of the Caffe network batch processing ?
>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <>
>>> I see, thanks for clarifying!
>>> I'd recommend following existing implementations in transformers.  You'll
need to define a UDF which operates on a single Row to compute the value for the new column.
 You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax
for what would otherwise be a SQL statement like "select ... from ...".  I'm recommending
looking at the existing implementation (rather than stating it here) because it changes between
Spark 1.2 and 1.3.  In 1.3, the DSL is much improved and makes it easier to create a new column.
>>> Joseph
>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <>
>>>> class DeepCNNFeature extends Transformer ... {
>>>>     override def transform(data: DataFrame, paramMap: ParamMap): DataFrame
= {
>>>>                  // How can I do a map partition on the underlying RDD and
then add the column ?
>>>>      }
>>>> }
>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <>
>>>>> Hi Joseph,
>>>>> Thank your for the tips. I understand what should I do when my data are
represented as a RDD. The thing that I can't figure out is how to do the same thing when the
data is view as a DataFrame and I need to add the result of my pretrained model as a new column
in the DataFrame. Preciselly, I want to implement the following transformer :
>>>>> class DeepCNNFeature extends Transformer ... {
>>>>> }
>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <>
>>>>>> Hi Jao,
>>>>>> You can use external tools and libraries if they can be called from
your Spark program or script (with appropriate conversion of data types, etc.).  The best
way to apply a pre-trained model to a dataset would be to call the model from within a closure,
>>>>>> { myDatum => preTrainedModel.predict(myDatum) }
>>>>>> If your data is distributed in an RDD (myRDD), then the above call
will distribute the computation of prediction using the pre-trained model.  It will require
that all of your Spark workers be able to run the preTrainedModel; that may mean installing
Caffe and dependencies on all nodes in the compute cluster.
>>>>>> For the second question, I would modify the above call as follows:
>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>> { myDatum => myModel.predict(myDatum)
>>>>>> }
>>>>>> I hope this helps!
>>>>>> Joseph
>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <>
>>>>>>> Dear all,
>>>>>>> We mainly do large scale computer vision task (image classification,
retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the
tutorial given on that topic during the latest spark summit (
) using the master version of spark pipeline and dataframe. The tutorial shows different examples
of feature extraction stages before running machine learning algorithms. Even the tutorial
is straightforward to reproduce with this new API, we still have some questions :
>>>>>>> Can one use external tools (e.g via pipe) as a pipeline stage
? An example of use case is to extract feature learned with convolutional neural network.
In our case, this corresponds to a pre-trained neural network with Caffe library (
>>>>>>> The second question is about the performance of the pipeline.
Library such as Caffe processes the data in batch and instancing one Caffe network can be
time consuming when this network is very deep. So, we can gain performance if we minimize
the number of Caffe network creation and give data in batch to the network. In the pipeline,
this corresponds to run transformers that work on a partition basis and give the whole partition
to a single caffe network. How can we create such a transformer ?
>>>>>>> Best,
>>>>>>> Jao

View raw message