spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shivaram Venkataraman <shiva...@eecs.berkeley.edu>
Subject Re: Some questions after playing a little with the new ml.Pipeline.
Date Tue, 31 Mar 2015 07:55:53 GMT
My guess is that the `createDataFrame` call is failing here.  Can you check
if the schema being passed to it includes the column name and type for the
newly being zipped `features` ?

Joseph probably knows this better, but AFAIK the DenseVector here will need
to be marked as a VectorUDT while creating a DataFrame column

Thanks
Shivaram

On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <jaonary@gmail.com>
wrote:

> Following your suggestion, I end up with the following implementation :
>
>
>
>
>
>
>
> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {  val schema
= transformSchema(dataSet.schema, paramMap, logging = true)  val map = this.paramMap ++ paramMap*
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>    Caffe.set_mode(Caffe.CPU)
   val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight))
   val inputBlobs: FloatBlobVector = net.input_blobs()    val N: Int = 1    val K: Int = inputBlobs.get(0).channels()
   val H: Int = inputBlobs.get(0).height()    val W: Int = inputBlobs.get(0).width()    inputBlobs.get(0).Reshape(N,
K, H, W)    val dataBlob = new FloatPointer(N*K*W*H)*
>     val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>
>     val feat = rows.map { case Row(a: Iterable[Float])=>
>       dataBlob.put(a.toArray, 0, a.size)
>       caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>       val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *      val resultDim = resultBlobs.get(0).channels()      logInfo(s"Output dimension
$resultDim")      val resultBlobData = resultBlobs.get(0).cpu_data()      val output = new
Array[Float](resultDim)      resultBlobData.get(output)      Vectors.dense(output.map(_.toDouble))
   }    //net.deallocate()    feat  }  val newRowData = dataSet.rdd.zip(features).map { case
(old, feat)=>    val oldSeq = old.toSeq      Row.fromSeq(oldSeq :+ feat)  }  dataSet.sqlContext.createDataFrame(newRowData,
schema)}*
>
>
> The idea is to mapPartitions of the underlying RDD of the DataFrame and
> create a new DataFrame by zipping the results. It seems to work but when I
> try to save the RDD I got the following error :
>
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
> org.apache.spark.sql.Row
>
>
> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
> shivaram@eecs.berkeley.edu> wrote:
>
>> One workaround could be to convert a DataFrame into a RDD inside the
>> transform function and then use mapPartitions/broadcast to work with the
>> JNI calls and then convert back to RDD.
>>
>> Thanks
>> Shivaram
>>
>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I'm still struggling to make a pre-trained caffe model transformer for
>>> dataframe works. The main problem is that creating a caffe model inside the
>>> UDF is very slow and consumes memories.
>>>
>>> Some of you suggest to broadcast the model. The problem with
>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>  and it is not serializable.
>>>
>>> Another possible approach is to use a UDF that can handle a whole
>>> partitions instead of just a row in order to minimize the caffe model
>>> instantiation.
>>>
>>> Is there any ideas to solve one of these two issues ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <joseph@databricks.com>
>>> wrote:
>>>
>>>> I see.  I think your best bet is to create the cnnModel on the master
>>>> and then serialize it to send to the workers.  If it's big (1M or so), then
>>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>>> now.
>>>>
>>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>>>> wrote:
>>>>
>>>>> Here is my current implementation with current master version of spark
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>>>> *    override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>>> DataFrame = {*
>>>>>
>>>>> *                  transformSchema(dataSet.schema, paramMap, logging
= true)*
>>>>>
>>>>>
>>>>>
>>>>> *                  val map = this.paramMap ++ paramMap              
   val deepCNNFeature = udf((v: Vector)=> {*
>>>>>
>>>>> *                              val cnnModel = new CaffeModel *
>>>>>
>>>>> *                              cnnModel.transform(v)*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *                  } : Vector )                  dataSet.withColumn(map(outputCol),
deepCNNFeature(col(map(inputCol))))*
>>>>>
>>>>>
>>>>> *     }*
>>>>> *}*
>>>>>
>>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>>
>>>>> The problem here is that for every row it will create a new instance
>>>>> of CaffeModel which is inefficient since creating a new model
>>>>> means loading a large model file. And it will transform only a single
>>>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>>>> In other words, is it possible to create an UDF that can operatats on
a
>>>>> partition in order to minimize the creation of a CaffeModel and
>>>>> to take advantage of the Caffe network batch processing ?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <joseph@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> I see, thanks for clarifying!
>>>>>>
>>>>>> I'd recommend following existing implementations in spark.ml
>>>>>> transformers.  You'll need to define a UDF which operates on a single
Row
>>>>>> to compute the value for the new column.  You can then use the DataFrame
>>>>>> DSL to create the new column; the DSL provides a nice syntax for
what would
>>>>>> otherwise be a SQL statement like "select ... from ...".  I'm recommending
>>>>>> looking at the existing implementation (rather than stating it here)
>>>>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is
much
>>>>>> improved and makes it easier to create a new column.
>>>>>>
>>>>>> Joseph
>>>>>>
>>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaonary@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>
>>>>>>>     override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>>> DataFrame = {
>>>>>>>
>>>>>>>
>>>>>>>                  // How can I do a map partition on the underlying
>>>>>>> RDD and then add the column ?
>>>>>>>
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <
>>>>>>> jaonary@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Joseph,
>>>>>>>>
>>>>>>>> Thank your for the tips. I understand what should I do when
my data
>>>>>>>> are represented as a RDD. The thing that I can't figure out
is how to do
>>>>>>>> the same thing when the data is view as a DataFrame and I
need to add the
>>>>>>>> result of my pretrained model as a new column in the DataFrame.
Preciselly,
>>>>>>>> I want to implement the following transformer :
>>>>>>>>
>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>>> joseph@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Jao,
>>>>>>>>>
>>>>>>>>> You can use external tools and libraries if they can
be called
>>>>>>>>> from your Spark program or script (with appropriate conversion
of data
>>>>>>>>> types, etc.).  The best way to apply a pre-trained model
to a dataset would
>>>>>>>>> be to call the model from within a closure, e.g.:
>>>>>>>>>
>>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum)
}
>>>>>>>>>
>>>>>>>>> If your data is distributed in an RDD (myRDD), then the
above call
>>>>>>>>> will distribute the computation of prediction using the
pre-trained model.
>>>>>>>>> It will require that all of your Spark workers be able
to run the
>>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies
on all
>>>>>>>>> nodes in the compute cluster.
>>>>>>>>>
>>>>>>>>> For the second question, I would modify the above call
as follows:
>>>>>>>>>
>>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>>>   val myModel = // instantiate neural network on this
partition
>>>>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum)
}
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> I hope this helps!
>>>>>>>>> Joseph
>>>>>>>>>
>>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>>> jaonary@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Dear all,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>>> classification, retrieval, ...). The pipeline is
really great stuff for
>>>>>>>>>> that. We're trying to reproduce the tutorial given
on that topic during the
>>>>>>>>>> latest spark summit (
>>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
)
>>>>>>>>>> using the master version of spark pipeline and dataframe.
The tutorial
>>>>>>>>>> shows different examples of feature extraction stages
before running
>>>>>>>>>> machine learning algorithms. Even the tutorial is
straightforward to
>>>>>>>>>> reproduce with this new API, we still have some questions
:
>>>>>>>>>>
>>>>>>>>>>    - Can one use external tools (e.g via pipe) as
a pipeline
>>>>>>>>>>    stage ? An example of use case is to extract feature
learned with
>>>>>>>>>>    convolutional neural network. In our case, this
corresponds to a
>>>>>>>>>>    pre-trained neural network with Caffe library
(
>>>>>>>>>>    http://caffe.berkeleyvision.org/) .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    - The second question is about the performance
of the
>>>>>>>>>>    pipeline. Library such as Caffe processes the
data in batch and instancing
>>>>>>>>>>    one Caffe network can be time consuming when this
network is very deep. So,
>>>>>>>>>>    we can gain performance if we minimize the number
of Caffe network creation
>>>>>>>>>>    and give data in batch to the network. In the
pipeline, this corresponds to
>>>>>>>>>>    run transformers that work on a partition basis
and give the whole
>>>>>>>>>>    partition to a single caffe network. How can we
create such a transformer ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Jao
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message