spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bago Amirbekian (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning
Date Fri, 05 Jan 2018 00:32:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312264#comment-16312264
] 

Bago Amirbekian commented on SPARK-22126:
-----------------------------------------

[~bryanc] thanks for taking the time to put together the PR and share thoughts. I like the
idea of being able to preserve the existing APIs and not needing to add a new fitMultiple
API but I'm concerned the existing APIs aren't quite flexible enough.

One of the use cases that motivated the {{ fitMultiple }} API was optimizing the Pipeline
Estimator. The Pipeline Estimator seems like in important one to optimize because I believe
it's required in order for CrossValidator to be able to exploit optimized implementations
of the {{ fit }}/{{ fitMultiple }} methods of Pipeline stages.

The way one would optimize the Pipeline Estimator is to group the paramMaps into a tree structure
where each level represents a stage with a param that can take multiple values. One would
then traverse the tree in depth first order. Notice that in this case the params need not
be estimator params, but could actually be transformer params as well since we can avoid applying
expensive transformers multiple times.

With this approach all the params for a pipeline estimator after the top level of the tree
are "optimizable" so simply being group on optimizable params isn't sufficient, we need to
actually order the paramMaps to match the depth first traversal of the stages tree.

I'm still thinking through all this in my head so let me know if any of it is off base or
not clear, but I think the advantage of the {{ fitMultiple }} approach gives us full flexibility
in order to these kinds of optimizations.

> Fix model-specific optimization support for ML tuning
> -----------------------------------------------------
>
>                 Key: SPARK-22126
>                 URL: https://issues.apache.org/jira/browse/SPARK-22126
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 2.3.0
>            Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links above), the latest
api proposal is:
> {code}
> def fitMultiple(
>     dataset: Dataset[_],
>     paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): Array[Callable[Map[Int,
M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow for model-specific
optimizations. For example, if we doing cross validation and have a param map with regParam
= (0.1, 0.3) and maxIter = (5, 10). Lets say that the cross validator could know that maxIter
is optimized for the model being evaluated (e.g. a new method in Estimator that return such
params). It would then be straightforward for the cross validator to remove maxIter from the
param map that will be parallelized over and use it to create 2 arrays of paramMaps: ((regParam=0.1,
maxIter=5), (regParam=0.1, maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, maxIter=5), (regParam=0.1,
maxIter=10)) can only be computed in one thread code, models computed from ((regParam=0.3,
maxIter=5), (regParam=0.3, maxIter=10))  in another thread. In this example, there're 4 paramMaps,
but we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return type is {code}Map[Int,
M]{code}, key is integer, used to mark the paramMap index for corresponding model. Use the
example above, there're 4 paramMaps, but only return 2 callable objects, one callable object
for ((regParam=0.1, maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3,
maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
>     Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
>     new Callable[Map[Int, M]] {
>       override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
>     }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>    fitCallables(dataset, paramMaps).map { _.call().toSeq }
>      .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>       val trainingDataset = sparkSession.createDataFrame(training, schema).cache()
>       val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
>       // Fit models in a Future for training in parallel
>       val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { callable =>
>          Future[Map[Int, Model[_]]] {
>             val modelMap = callable.call()
>             if (collectSubModelsParam) {
>                ...
>             }
>             modelMap
>          } (executionContext)
>       }
>       // Unpersist training data only when all models have trained
>       Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, executionContext)
>         .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>       // Evaluate models in a Future that will calulate a metric and allow model to be
cleaned up
>       val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
>         modelMapFuture.map { modelMap =>
>           modelMap.map { case (index: Int, model: Model[_]) =>
>             val metric = eval.evaluate(model.transform(validationDataset, paramMaps(index)))
>             (index, metric)
>           }
>         } (executionContext)
>       }
>       // Wait for metrics to be calculated before unpersisting validation dataset
>       val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
>           .map(_.toSeq).sortBy(_._1).map(_._2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message