##### Site index · List index
Message view
Top
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Date Thu, 21 May 2015 08:05:59 GMT
```
]

ASF GitHub Bot commented on FLINK-1992:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver
{
}

+
+
/** Provides a solution for the given optimization problem
*
* @param data A Dataset of LabeledVector (label, features) pairs
-    * @param initWeights The initial weights that will be optimized
+    * @param initialWeights The initial weights that will be optimized
* @return The weights, optimized for the provided data.
*/
override def optimize(
data: DataSet[LabeledVector],
-    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-    // TODO: Faster way to do this?
-    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
-
-    val numberOfIterations: Int = parameterMap(Iterations)
+    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
+    val numberOfIterations: Int = parameters(Iterations)
+    // TODO(tvas): This looks out of place, why don't we get back an Option from
+    // parameters(ConvergenceThreshold)?
+    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)

// Initialize weights
-    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-      // Ensure provided weight vector is a DenseVector
-      case Some(wvDS) => {
-        wvDS.map{wv => {
-          val denseWeights = wv.weights match {
-            case dv: DenseVector => dv
-            case sv: SparseVector => sv.toDenseVector
+    val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights,
data)
+
+    // Perform the iterations
+    val optimizedWeights = convergenceThresholdOption match {
+      // No convergence criterion
+      case None =>
+        initialWeightsDS.iterate(numberOfIterations) {
+          weightVectorDS => {
+            SGDStep(data, weightVectorDS)
}
-          WeightVector(denseWeights, wv.intercept)
}
-
+      case Some(convergence) =>
+        /** Calculates the regularized loss, from the data and given weights **/
+        def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]):
+        DataSet[Double] = {
+          data.map {
+            new LossCalculation
+            .reduce {
+            (left, right) =>
+              val (leftLoss, leftCount) = left
+              val (rightLoss, rightCount) = right
+              (leftLoss + rightLoss, rightCount + leftCount)
+          }
+            .map{new RegularizedLossCalculation}
}
-      }
-      case None => createInitialWeightVector(dimensionsDS)
-    }
-
-    // Perform the iterations
-    // TODO: Enable convergence stopping criterion, as in Multiple Linear regression
-    initialWeightsDS.iterate(numberOfIterations) {
-      weightVector => {
-        SGDStep(data, weightVector)
-      }
+        // We have to calculate for each weight vector the sum of squared residuals,
+        // and then sum them and apply regularization
+        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+        // Combine weight vector with the current loss
+        val initialWeightsWithLossSum = initialWeightsDS.
+          crossWithTiny(initialLossSumDS).setParallelism(1)
+
+        val resultWithLoss = initialWeightsWithLossSum.
+          iterateWithTermination(numberOfIterations) {
+          weightsWithLossSum =>
+
+            // Extract weight vector and loss
+            val previousWeightsDS = weightsWithLossSum.map{_._1}
+            val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+            val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+            // Check if the relative change in the loss is smaller than the
+            // convergence threshold. If yes, then terminate i.e. return empty termination
data set
+            val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+              filter{
+              pair => {
+                val (previousLoss, currentLoss) = pair
+
+                if (previousLoss <= 0) {
+                  false
+                } else {
+                  math.abs((previousLoss - currentLoss)/previousLoss) >= convergence
--- End diff --

Can we also support different convergence criterion? For example, the absolute loss?

> Add convergence criterion to SGD optimizer
> ------------------------------------------
>
>          Issue Type: Improvement
>          Components: Machine Learning Library
>            Reporter: Till Rohrmann
>            Assignee: Theodore Vasiloudis
>            Priority: Minor
>              Labels: ML
>             Fix For: 0.9
>
>
> Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good
to support a dynamic convergence criterion, too.

--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

```
Mime
View raw message