flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Date Thu, 21 May 2015 08:07:39 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/692#discussion_r30780559
  
    --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
---
    @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver
{
         }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
       }
     
    +
    +
       /** Provides a solution for the given optimization problem
         *
         * @param data A Dataset of LabeledVector (label, features) pairs
    -    * @param initWeights The initial weights that will be optimized
    +    * @param initialWeights The initial weights that will be optimized
         * @return The weights, optimized for the provided data.
         */
       override def optimize(
         data: DataSet[LabeledVector],
    -    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    -    // TODO: Faster way to do this?
    -    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
    -
    -    val numberOfIterations: Int = parameterMap(Iterations)
    +    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    +    val numberOfIterations: Int = parameters(Iterations)
    +    // TODO(tvas): This looks out of place, why don't we get back an Option from
    +    // parameters(ConvergenceThreshold)?
    +    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
     
         // Initialize weights
    -    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
    -      // Ensure provided weight vector is a DenseVector
    -      case Some(wvDS) => {
    -        wvDS.map{wv => {
    -          val denseWeights = wv.weights match {
    -            case dv: DenseVector => dv
    -            case sv: SparseVector => sv.toDenseVector
    +    val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights,
data)
    +
    +    // Perform the iterations
    +    val optimizedWeights = convergenceThresholdOption match {
    +      // No convergence criterion
    +      case None =>
    +        initialWeightsDS.iterate(numberOfIterations) {
    +          weightVectorDS => {
    +            SGDStep(data, weightVectorDS)
               }
    -          WeightVector(denseWeights, wv.intercept)
             }
    -
    +      case Some(convergence) =>
    +        /** Calculates the regularized loss, from the data and given weights **/
    +        def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]):
    +        DataSet[Double] = {
    +          data.map {
    +            new LossCalculation
    +          }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
    +            .reduce {
    +            (left, right) =>
    +              val (leftLoss, leftCount) = left
    +              val (rightLoss, rightCount) = right
    +              (leftLoss + rightLoss, rightCount + leftCount)
    +          }
    +            .map{new RegularizedLossCalculation}
    +            .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
             }
    -      }
    -      case None => createInitialWeightVector(dimensionsDS)
    -    }
    -
    -    // Perform the iterations
    -    // TODO: Enable convergence stopping criterion, as in Multiple Linear regression
    -    initialWeightsDS.iterate(numberOfIterations) {
    -      weightVector => {
    -        SGDStep(data, weightVector)
    -      }
    +        // We have to calculate for each weight vector the sum of squared residuals,
    +        // and then sum them and apply regularization
    +        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
    +
    +        // Combine weight vector with the current loss
    +        val initialWeightsWithLossSum = initialWeightsDS.
    +          crossWithTiny(initialLossSumDS).setParallelism(1)
    +
    +        val resultWithLoss = initialWeightsWithLossSum.
    +          iterateWithTermination(numberOfIterations) {
    +          weightsWithLossSum =>
    +
    +            // Extract weight vector and loss
    +            val previousWeightsDS = weightsWithLossSum.map{_._1}
    +            val previousLossSumDS = weightsWithLossSum.map{_._2}
    +
    +            val currentWeightsDS = SGDStep(data, previousWeightsDS)
    +
    +            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
    +
    +            // Check if the relative change in the loss is smaller than the
    +            // convergence threshold. If yes, then terminate i.e. return empty termination
data set
    +            val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
    +              filter{
    +              pair => {
    +                val (previousLoss, currentLoss) = pair
    +
    +                if (previousLoss <= 0) {
    +                  false
    +                } else {
    +                  math.abs((previousLoss - currentLoss)/previousLoss) >= convergence
    +                }
    +              }
    +            }
    +
    +            // Result for new iteration
    +            (currentWeightsDS cross currentLossSumDS, termination)
    +        }
    +        // Return just the weights
    +        resultWithLoss.map{_._1}
         }
    +    optimizedWeights
       }
     
    -  /** Mapping function that calculates the weight gradients from the data.
    +  /** Calculates the loss value, given a labeled vector and the current weight vector
         *
    +    * The weight vector is received as a broadcast variable.
         */
    -  private class GradientCalculation extends
    -    RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
    +  private class LossCalculation extends RichMapFunction[LabeledVector, (Double, Int)]
{
     
         var weightVector: WeightVector = null
     
    +
         @throws(classOf[Exception])
         override def open(configuration: Configuration): Unit = {
           val list = this.getRuntimeContext.
             getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST)
     
           weightVector = list.get(0)
    -    }
     
    -    override def map(example: LabeledVector): (WeightVector, Double, Int) = {
    +    }
     
    -      val lossFunction = parameterMap(LossFunction)
    -      val regType = parameterMap(RegularizationType)
    -      val regParameter = parameterMap(RegularizationParameter)
    -      val predictionFunction = parameterMap(PredictionFunctionParameter)
    +    override def map(example: LabeledVector): (Double, Int) = {
    +      val lossFunction = parameters(LossFunctionParameter)
    +      val predictionFunction = parameters(PredictionFunctionParameter)
           val dimensions = example.vector.size
    -      // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement?
    -      // The idea in spark is to avoid object creation, but here we have to do it anyway
    +      // TODO(tvas): Avoid needless creation of WeightGradient object
    +      // Create a lossValue function in LossFunction?
           val weightGradient = new DenseVector(new Array[Double](dimensions))
    --- End diff --
    
    Yes that sounds like a good idea. `lossAndGradient` can then simply call the `lossValue`
function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message