ASF GitHub Bot commented on FLINK-1992:
Github user tillrohrmann commented on a diff in the pull request:

@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver
{
}

+
+
/** Provides a solution for the given optimization problem
*
* @param data A Dataset of LabeledVector (label, features) pairs
-    * @param initWeights The initial weights that will be optimized
+    * @param initialWeights The initial weights that will be optimized
* @return The weights, optimized for the provided data.
*/
override def optimize(
data: DataSet[LabeledVector],
-    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-    // TODO: Faster way to do this?
-    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
-
-    val numberOfIterations: Int = parameterMap(Iterations)
+    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
+    val numberOfIterations: Int = parameters(Iterations)
+    // TODO(tvas): This looks out of place, why don't we get back an Option from
+    // parameters(ConvergenceThreshold)?
+    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)

// Initialize weights
-    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-      // Ensure provided weight vector is a DenseVector
-      case Some(wvDS) => {
-        wvDS.map{wv => {
-          val denseWeights = wv.weights match {
-            case dv: DenseVector => dv
-            case sv: SparseVector => sv.toDenseVector
+    val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights,
data)
+
+    // Perform the iterations
+    val optimizedWeights = convergenceThresholdOption match {
+      // No convergence criterion
+      case None =>
+        initialWeightsDS.iterate(numberOfIterations) {
+          weightVectorDS => {
+            SGDStep(data, weightVectorDS)
}
-          WeightVector(denseWeights, wv.intercept)
}
-
+      case Some(convergence) =>
+        /** Calculates the regularized loss, from the data and given weights **/
+        def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]):
+        DataSet[Double] = {
+          data.map {
+            new LossCalculation
+            .reduce {
+            (left, right) =>
+              val (leftLoss, leftCount) = left
+              val (rightLoss, rightCount) = right
+              (leftLoss + rightLoss, rightCount + leftCount)
+          }
+            .map{new RegularizedLossCalculation}
}
-      }
-      case None => createInitialWeightVector(dimensionsDS)
-    }
-
-    // Perform the iterations
-    // TODO: Enable convergence stopping criterion, as in Multiple Linear regression
-    initialWeightsDS.iterate(numberOfIterations) {
-      weightVector => {
-        SGDStep(data, weightVector)
-      }
+        // We have to calculate for each weight vector the sum of squared residuals,
+        // and then sum them and apply regularization
+        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+        // Combine weight vector with the current loss
+        val initialWeightsWithLossSum = initialWeightsDS.
+          crossWithTiny(initialLossSumDS).setParallelism(1)
+
+        val resultWithLoss = initialWeightsWithLossSum.
+          iterateWithTermination(numberOfIterations) {
+          weightsWithLossSum =>
+
+            // Extract weight vector and loss
+            val previousWeightsDS = weightsWithLossSum.map{_._1}
+            val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+            val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+            // Check if the relative change in the loss is smaller than the
+            // convergence threshold. If yes, then terminate i.e. return empty termination
data set
+            val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+              filter{
+              pair => {
+                val (previousLoss, currentLoss) = pair
+
+                if (previousLoss <= 0) {
+                  false
+                } else {
+                  math.abs((previousLoss - currentLoss)/previousLoss) >= convergence
+                }
+              }
+            }
+
+            // Result for new iteration
+            (currentWeightsDS cross currentLossSumDS, termination)
+        }
+        // Return just the weights
+        resultWithLoss.map{_._1}
}
+    optimizedWeights
}

-  /** Mapping function that calculates the weight gradients from the data.
+  /** Calculates the loss value, given a labeled vector and the current weight vector
*
*/
-    RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
+  private class LossCalculation extends RichMapFunction[LabeledVector, (Double, Int)]
{

var weightVector: WeightVector = null

+
@throws(classOf[Exception])
override def open(configuration: Configuration): Unit = {
val list = this.getRuntimeContext.

weightVector = list.get(0)
-    }

-    override def map(example: LabeledVector): (WeightVector, Double, Int) = {
+    }

-      val lossFunction = parameterMap(LossFunction)
-      val regType = parameterMap(RegularizationType)
-      val regParameter = parameterMap(RegularizationParameter)
-      val predictionFunction = parameterMap(PredictionFunctionParameter)
+    override def map(example: LabeledVector): (Double, Int) = {
+      val lossFunction = parameters(LossFunctionParameter)
+      val predictionFunction = parameters(PredictionFunctionParameter)
val dimensions = example.vector.size
-      // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement?
-      // The idea in spark is to avoid object creation, but here we have to do it anyway
+      // TODO(tvas): Avoid needless creation of WeightGradient object
+      // Create a lossValue function in LossFunction?
val weightGradient = new DenseVector(new Array[Double](dimensions))

-      // TODO(tvas): Indentation here?
-      val (loss, lossDeriv) = lossFunction.lossAndGradient(
-                                example,
-                                weightVector,
-                                regType,
-                                regParameter,
-                                predictionFunction)
+      val (loss, _) = lossFunction.lossAndGradient(
+        example,
+        weightVector,
+        predictionFunction)

-      (new WeightVector(weightGradient, lossDeriv), loss, 1)
+      (loss, 1)
}
}

+/** Calculates the regularized loss value, given the loss and the current weight vector
+  *
+  */
+private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), Double]
{
+
+  var weightVector: WeightVector = null
+
+
--- End diff --

two line breaks intended?

