flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
Date Tue, 21 Apr 2015 09:57:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14504707#comment-14504707
] 

ASF GitHub Bot commented on FLINK-1807:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/613#discussion_r28764408
  
    --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala ---
    @@ -0,0 +1,556 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math
    +
    +import com.github.fommil.netlib.{BLAS => NetlibBLAS, F2jBLAS}
    +import com.github.fommil.netlib.BLAS.{getInstance => NativeBLAS}
    +
    +
    +/**
    + * BLAS routines for MLlib's vectors and matrices.
    + */
    +object BLAS extends Serializable {
    +
    +  @transient private var _f2jBLAS: NetlibBLAS = _
    +  @transient private var _nativeBLAS: NetlibBLAS = _
    +
    +  // For level-1 routines, we use Java implementation.
    +  private def f2jBLAS: NetlibBLAS = {
    +    if (_f2jBLAS == null) {
    +      _f2jBLAS = new F2jBLAS
    +    }
    +    _f2jBLAS
    +  }
    +
    +  /**
    +   * y += a * x
    +   */
    +  def axpy(a: Double, x: Vector, y: Vector): Unit = {
    +    require(x.size == y.size)
    +    y match {
    +      case dy: DenseVector =>
    +        x match {
    +          case sx: SparseVector =>
    +            axpy(a, sx, dy)
    +          case dx: DenseVector =>
    +            axpy(a, dx, dy)
    +          case _ =>
    +            throw new UnsupportedOperationException(
    +              s"axpy doesn't support x type ${x.getClass}.")
    +        }
    +      case _ =>
    +        throw new IllegalArgumentException(
    +          s"axpy only supports adding to a dense vector but got type ${y.getClass}.")
    +    }
    +  }
    +
    +  /**
    +   * y += a * x
    +   */
    +  private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = {
    +    val n = x.size
    +    f2jBLAS.daxpy(n, a, x.data, 1, y.data, 1)
    +  }
    +
    +  /**
    +   * y += a * x
    +   */
    +  private def axpy(a: Double, x: SparseVector, y: DenseVector): Unit = {
    +    val xValues = x.data
    +    val xIndices = x.indices
    +    val yValues = y.data
    +    val nnz = xIndices.size
    +
    +    if (a == 1.0) {
    +      var k = 0
    +      while (k < nnz) {
    +        yValues(xIndices(k)) += xValues(k)
    +        k += 1
    +      }
    +    } else {
    +      var k = 0
    +      while (k < nnz) {
    +        yValues(xIndices(k)) += a * xValues(k)
    +        k += 1
    +      }
    +    }
    +  }
    +
    +  /**
    +   * dot(x, y)
    +   */
    +  def dot(x: Vector, y: Vector): Double = {
    +    require(x.size == y.size,
    +      "BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
    +        " x.size = " + x.size + ", y.size = " + y.size)
    +    (x, y) match {
    +      case (dx: DenseVector, dy: DenseVector) =>
    +        dot(dx, dy)
    +      case (sx: SparseVector, dy: DenseVector) =>
    +        dot(sx, dy)
    +      case (dx: DenseVector, sy: SparseVector) =>
    +        dot(sy, dx)
    +      case (sx: SparseVector, sy: SparseVector) =>
    +        dot(sx, sy)
    +      case _ =>
    +        throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
    +    }
    +  }
    +
    +  /**
    +   * dot(x, y)
    +   */
    +  private def dot(x: DenseVector, y: DenseVector): Double = {
    +    val n = x.size
    +    f2jBLAS.ddot(n, x.data, 1, y.data, 1)
    +  }
    +
    +  /**
    +   * dot(x, y)
    +   */
    +  private def dot(x: SparseVector, y: DenseVector): Double = {
    +    val xValues = x.data
    +    val xIndices = x.indices
    +    val yValues = y.data
    +    val nnz = xIndices.size
    +
    +    var sum = 0.0
    +    var k = 0
    +    while (k < nnz) {
    +      sum += xValues(k) * yValues(xIndices(k))
    +      k += 1
    +    }
    +    sum
    +  }
    +
    +  /**
    +   * dot(x, y)
    +   */
    +  private def dot(x: SparseVector, y: SparseVector): Double = {
    +    val xValues = x.data
    +    val xIndices = x.indices
    +    val yValues = y.data
    +    val yIndices = y.indices
    +    val nnzx = xIndices.size
    +    val nnzy = yIndices.size
    +
    +    var kx = 0
    +    var ky = 0
    +    var sum = 0.0
    +    // y catching x
    +    while (kx < nnzx && ky < nnzy) {
    +      val ix = xIndices(kx)
    +      while (ky < nnzy && yIndices(ky) < ix) {
    +        ky += 1
    +      }
    +      if (ky < nnzy && yIndices(ky) == ix) {
    +        sum += xValues(kx) * yValues(ky)
    +        ky += 1
    +      }
    +      kx += 1
    +    }
    +    sum
    +  }
    +
    +  /**
    +   * y = x
    +   */
    +  def copy(x: Vector, y: Vector): Unit = {
    +    val n = y.size
    +    require(x.size == n)
    +    y match {
    +      case dy: DenseVector =>
    +        x match {
    +          case sx: SparseVector =>
    +            val sxIndices = sx.indices
    +            val sxValues = sx.data
    +            val dyValues = dy.data
    +            val nnz = sxIndices.size
    +
    +            var i = 0
    +            var k = 0
    +            while (k < nnz) {
    +              val j = sxIndices(k)
    +              while (i < j) {
    +                dyValues(i) = 0.0
    +                i += 1
    +              }
    +              dyValues(i) = sxValues(k)
    +              i += 1
    +              k += 1
    +            }
    +            while (i < n) {
    +              dyValues(i) = 0.0
    +              i += 1
    +            }
    +          case dx: DenseVector =>
    +            Array.copy(dx.data, 0, dy.data, 0, n)
    +        }
    +      case _ =>
    +        throw new IllegalArgumentException(s"y must be dense in copy but got ${y.getClass}")
    +    }
    +  }
    +
    +  /**
    +   * x = a * x
    +   */
    +  def scal(a: Double, x: Vector): Unit = {
    +    x match {
    +      case sx: SparseVector =>
    +        f2jBLAS.dscal(sx.data.size, a, sx.data, 1)
    +      case dx: DenseVector =>
    +        f2jBLAS.dscal(dx.data.size, a, dx.data, 1)
    +      case _ =>
    +        throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.")
    +    }
    +  }
    +
    +  // For level-3 routines, we use the native BLAS.
    +  private def nativeBLAS: NetlibBLAS = {
    +    if (_nativeBLAS == null) {
    +      _nativeBLAS = NativeBLAS
    +    }
    +    _nativeBLAS
    +  }
    +
    +  /**
    +   * A := alpha * x * x^T^ + A
    +   * @param alpha a real scalar that will be multiplied to x * x^T^.
    +   * @param x the vector x that contains the n elements.
    +   * @param A the symmetric matrix A. Size of n x n.
    +   */
    +  def syr(alpha: Double, x: Vector, A: DenseMatrix) {
    +    val mA = A.numRows
    +    val nA = A.numCols
    +    require(mA == nA, s"A is not a square matrix (and hence is not symmetric). A: $mA
x $nA")
    +    require(mA == x.size, s"The size of x doesn't match the rank of A. A: $mA x $nA,
x: ${x.size}")
    +
    +    x match {
    +      case dv: DenseVector => syr(alpha, dv, A)
    +      case sv: SparseVector => syr(alpha, sv, A)
    +      case _ =>
    +        throw new IllegalArgumentException(s"syr doesn't support vector type ${x.getClass}.")
    +    }
    +  }
    +
    +  private def syr(alpha: Double, x: DenseVector, A: DenseMatrix) {
    +    val nA = A.numRows
    +    val mA = A.numCols
    +
    +    nativeBLAS.dsyr("U", x.size, alpha, x.data, 1, A.data, nA)
    +
    +    // Fill lower triangular part of A
    +    var i = 0
    +    while (i < mA) {
    +      var j = i + 1
    +      while (j < nA) {
    +        A(j, i) = A(i, j)
    +        j += 1
    +      }
    +      i += 1
    +    }
    +  }
    +
    +  private def syr(alpha: Double, x: SparseVector, A: DenseMatrix) {
    +    val mA = A.numCols
    +    val xIndices = x.indices
    +    val xValues = x.data
    +    val nnz = xValues.length
    +    val Avalues = A.data
    +
    +    var i = 0
    +    while (i < nnz) {
    +      val multiplier = alpha * xValues(i)
    +      val offset = xIndices(i) * mA
    +      var j = 0
    +      while (j < nnz) {
    +        Avalues(xIndices(j) + offset) += multiplier * xValues(j)
    +        j += 1
    +      }
    +      i += 1
    +    }
    +  }
    +}
    +
    +//  /**
    +//   * C := alpha * A * B + beta * C
    +//   * @param alpha a scalar to scale the multiplication A * B.
    +//   * @param A the matrix A that will be left multiplied to B. Size of m x k.
    +//   * @param B the matrix B that will be left multiplied by A. Size of k x n.
    +//   * @param beta a scalar that can be used to scale matrix C.
    +//   * @param C the resulting matrix C. Size of m x n. C.isTransposed must be false.
    +//   */
    +//  def gemm(
    +//            alpha: Double,
    +//            A: Matrix,
    +//            B: DenseMatrix,
    +//            beta: Double,
    +//            C: DenseMatrix): Unit = {
    +//    require(!C.isTransposed,
    +//      "The matrix C cannot be the product of a transpose() call. C.isTransposed must
be false.")
    +//    if (alpha == 0.0) {
    +//      logDebug("gemm: alpha is equal to 0. Returning C.")
    +//    } else {
    +//      A match {
    +//        case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C)
    +//        case dense: DenseMatrix => gemm(alpha, dense, B, beta, C)
    +//        case _ =>
    +//          throw new IllegalArgumentException(s"gemm doesn't support matrix type ${A.getClass}.")
    +//      }
    +//    }
    +//  }
    +//
    +//  /**
    +//   * C := alpha * A * B + beta * C
    +//   * For `DenseMatrix` A.
    +//   */
    +//  private def gemm(
    +//                    alpha: Double,
    +//                    A: DenseMatrix,
    +//                    B: DenseMatrix,
    +//                    beta: Double,
    +//                    C: DenseMatrix): Unit = {
    +//    val tAstr = if (A.isTransposed) "T" else "N"
    +//    val tBstr = if (B.isTransposed) "T" else "N"
    +//    val lda = if (!A.isTransposed) A.numRows else A.numCols
    +//    val ldb = if (!B.isTransposed) B.numRows else B.numCols
    +//
    +//    require(A.numCols == B.numRows,
    +//      s"The columns of A don't match the rows of B. A: ${A.numCols}, B: ${B.numRows}")
    +//    require(A.numRows == C.numRows,
    +//      s"The rows of C don't match the rows of A. C: ${C.numRows}, A: ${A.numRows}")
    +//    require(B.numCols == C.numCols,
    +//      s"The columns of C don't match the columns of B. C: ${C.numCols}, A: ${B.numCols}")
    +//    nativeBLAS.dgemm(tAstr, tBstr, A.numRows, B.numCols, A.numCols, alpha, A.data,
lda,
    +//      B.data, ldb, beta, C.data, C.numRows)
    +//  }
    +//
    +//  /**
    +//   * C := alpha * A * B + beta * C
    +//   * For `SparseMatrix` A.
    +//   */
    +//  private def gemm(
    +//                    alpha: Double,
    +//                    A: SparseMatrix,
    +//                    B: DenseMatrix,
    +//                    beta: Double,
    +//                    C: DenseMatrix): Unit = {
    +//    val mA: Int = A.numRows
    +//    val nB: Int = B.numCols
    +//    val kA: Int = A.numCols
    +//    val kB: Int = B.numRows
    +//
    +//    require(kA == kB, s"The columns of A don't match the rows of B. A: $kA, B: $kB")
    +//    require(mA == C.numRows, s"The rows of C don't match the rows of A. C: ${C.numRows},
A: $mA")
    +//    require(nB == C.numCols,
    +//      s"The columns of C don't match the columns of B. C: ${C.numCols}, A: $nB")
    +//
    +//    val Avals = A.data
    +//    val Bvals = B.data
    +//    val Cvals = C.data
    +//    val ArowIndices = A.rowIndices
    +//    val AcolPtrs = A.colPtrs
    +//
    +//    // Slicing is easy in this case. This is the optimal multiplication setting for
sparse matrices
    +//    if (A.isTransposed){
    +//      var colCounterForB = 0
    +//      if (!B.isTransposed) { // Expensive to put the check inside the loop
    +//        while (colCounterForB < nB) {
    +//          var rowCounterForA = 0
    +//          val Cstart = colCounterForB * mA
    +//          val Bstart = colCounterForB * kA
    +//          while (rowCounterForA < mA) {
    +//            var i = AcolPtrs(rowCounterForA)
    +//            val indEnd = AcolPtrs(rowCounterForA + 1)
    +//            var sum = 0.0
    +//            while (i < indEnd) {
    +//              sum += Avals(i) * Bvals(Bstart + ArowIndices(i))
    +//              i += 1
    +//            }
    +//            val Cindex = Cstart + rowCounterForA
    +//            Cvals(Cindex) = beta * Cvals(Cindex) + sum * alpha
    +//            rowCounterForA += 1
    +//          }
    +//          colCounterForB += 1
    +//        }
    +//      } else {
    +//        while (colCounterForB < nB) {
    +//          var rowCounterForA = 0
    +//          val Cstart = colCounterForB * mA
    +//          while (rowCounterForA < mA) {
    +//            var i = AcolPtrs(rowCounterForA)
    +//            val indEnd = AcolPtrs(rowCounterForA + 1)
    +//            var sum = 0.0
    +//            while (i < indEnd) {
    +//              sum += Avals(i) * B(ArowIndices(i), colCounterForB)
    +//              i += 1
    +//            }
    +//            val Cindex = Cstart + rowCounterForA
    +//            Cvals(Cindex) = beta * Cvals(Cindex) + sum * alpha
    +//            rowCounterForA += 1
    +//          }
    +//          colCounterForB += 1
    +//        }
    +//      }
    +//    } else {
    +//      // Scale matrix first if `beta` is not equal to 0.0
    +//      if (beta != 0.0) {
    +//        f2jBLAS.dscal(C.data.length, beta, C.data, 1)
    +//      }
    +//      // Perform matrix multiplication and add to C. The rows of A are multiplied by
the columns of
    +//      // B, and added to C.
    +//      var colCounterForB = 0 // the column to be updated in C
    +//      if (!B.isTransposed) { // Expensive to put the check inside the loop
    +//        while (colCounterForB < nB) {
    +//          var colCounterForA = 0 // The column of A to multiply with the row of B
    +//          val Bstart = colCounterForB * kB
    +//          val Cstart = colCounterForB * mA
    +//          while (colCounterForA < kA) {
    +//            var i = AcolPtrs(colCounterForA)
    +//            val indEnd = AcolPtrs(colCounterForA + 1)
    +//            val Bval = Bvals(Bstart + colCounterForA) * alpha
    +//            while (i < indEnd) {
    +//              Cvals(Cstart + ArowIndices(i)) += Avals(i) * Bval
    +//              i += 1
    +//            }
    +//            colCounterForA += 1
    +//          }
    +//          colCounterForB += 1
    +//        }
    +//      } else {
    +//        while (colCounterForB < nB) {
    +//          var colCounterForA = 0 // The column of A to multiply with the row of B
    +//          val Cstart = colCounterForB * mA
    +//          while (colCounterForA < kA) {
    +//            var i = AcolPtrs(colCounterForA)
    +//            val indEnd = AcolPtrs(colCounterForA + 1)
    +//            val Bval = B(colCounterForA, colCounterForB) * alpha
    +//            while (i < indEnd) {
    +//              Cvals(Cstart + ArowIndices(i)) += Avals(i) * Bval
    +//              i += 1
    +//            }
    +//            colCounterForA += 1
    +//          }
    +//          colCounterForB += 1
    +//        }
    +//      }
    +//    }
    +//  }
    +//
    +//  /**
    +//   * y := alpha * A * x + beta * y
    +//   * @param alpha a scalar to scale the multiplication A * x.
    +//   * @param A the matrix A that will be left multiplied to x. Size of m x n.
    +//   * @param x the vector x that will be left multiplied by A. Size of n x 1.
    +//   * @param beta a scalar that can be used to scale vector y.
    +//   * @param y the resulting vector y. Size of m x 1.
    +//   */
    +//  def gemv(
    +//            alpha: Double,
    +//            A: Matrix,
    +//            x: DenseVector,
    +//            beta: Double,
    +//            y: DenseVector): Unit = {
    +//    require(A.numCols == x.size,
    +//      s"The columns of A don't match the number of elements of x. A: ${A.numCols},
x: ${x.size}")
    +//    require(A.numRows == y.size,
    +//      s"The rows of A don't match the number of elements of y. A: ${A.numRows}, y:${y.size}}")
    +//    if (alpha == 0.0) {
    +//      logDebug("gemv: alpha is equal to 0. Returning y.")
    +//    } else {
    +//      A match {
    +//        case sparse: SparseMatrix =>
    +//          gemv(alpha, sparse, x, beta, y)
    +//        case dense: DenseMatrix =>
    +//          gemv(alpha, dense, x, beta, y)
    +//        case _ =>
    +//          throw new IllegalArgumentException(s"gemv doesn't support matrix type ${A.getClass}.")
    +//      }
    +//    }
    +//  }
    +//
    +//  /**
    +//   * y := alpha * A * x + beta * y
    +//   * For `DenseMatrix` A.
    +//   */
    +//  private def gemv(
    +//                    alpha: Double,
    +//                    A: DenseMatrix,
    +//                    x: DenseVector,
    +//                    beta: Double,
    +//                    y: DenseVector): Unit =  {
    +//    val tStrA = if (A.isTransposed) "T" else "N"
    +//    val mA = if (!A.isTransposed) A.numRows else A.numCols
    +//    val nA = if (!A.isTransposed) A.numCols else A.numRows
    +//    nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.data, mA, x.data, 1, beta,
    +//      y.data, 1)
    +//  }
    +//
    +//  /**
    +//   * y := alpha * A * x + beta * y
    +//   * For `SparseMatrix` A.
    +//   */
    +//  private def gemv(
    +//                    alpha: Double,
    +//                    A: SparseMatrix,
    +//                    x: DenseVector,
    +//                    beta: Double,
    +//                    y: DenseVector): Unit =  {
    +//    val xValues = x.data
    +//    val yValues = y.data
    +//    val mA: Int = A.numRows
    +//    val nA: Int = A.numCols
    +//
    +//    val Avals = A.data
    +//    val Arows = if (!A.isTransposed) A.rowIndices else A.colPtrs
    +//    val Acols = if (!A.isTransposed) A.colPtrs else A.rowIndices
    +//    // Slicing is easy in this case. This is the optimal multiplication setting for
sparse matrices
    +//    if (A.isTransposed) {
    +//      var rowCounter = 0
    +//      while (rowCounter < mA) {
    +//        var i = Arows(rowCounter)
    +//        val indEnd = Arows(rowCounter + 1)
    +//        var sum = 0.0
    +//        while (i < indEnd) {
    +//          sum += Avals(i) * xValues(Acols(i))
    +//          i += 1
    +//        }
    +//        yValues(rowCounter) = beta * yValues(rowCounter) + sum * alpha
    +//        rowCounter += 1
    +//      }
    +//    } else {
    +//      // Scale vector first if `beta` is not equal to 0.0
    +//      if (beta != 0.0) {
    +//        scal(beta, y)
    +//      }
    +//      // Perform matrix-vector multiplication and add to y
    +//      var colCounterForA = 0
    +//      while (colCounterForA < nA) {
    +//        var i = Acols(colCounterForA)
    +//        val indEnd = Acols(colCounterForA + 1)
    +//        val xVal = xValues(colCounterForA) * alpha
    +//        while (i < indEnd) {
    +//          val rowIndex = Arows(i)
    +//          yValues(rowIndex) += Avals(i) * xVal
    +//          i += 1
    +//        }
    +//        colCounterForA += 1
    +//      }
    +//    }
    +//  }
    +//}
    --- End diff --
    
    We should either remove this code or port it.


> Stochastic gradient descent optimizer for ML library
> ----------------------------------------------------
>
>                 Key: FLINK-1807
>                 URL: https://issues.apache.org/jira/browse/FLINK-1807
>             Project: Flink
>          Issue Type: Improvement
>          Components: Machine Learning Library
>            Reporter: Till Rohrmann
>            Assignee: Theodore Vasiloudis
>              Labels: ML
>
> Stochastic gradient descent (SGD) is a widely used optimization technique in different
ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which
can be instantiated with the respective gradient computation. Such a building block would
make the development of future algorithms easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message