spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikens <msmel...@gmail.com>
Subject Parallel execution on one node
Date Fri, 09 Jan 2015 06:57:37 GMT
Hello,
I am new to Spark. I have adapted an example code to do binary
classification using logistic regression. I tried it on rcv1_train.binary
dataset using LBFGS.runLBFGS solver, and obtained correct loss.

Now, I'd like to run code in parallel across 16 cores of my single CPU
socket. If I understand correctly, parallelism in Spark is achieved by
partitioning dataset into some number of partitions, approximately 3-4 times
the amount of cores in the system.  To partition the data, I am calling
data.repartition(npart), where npart is number of partitions (16*4=64 in my
case).

I run the code as follows: 
spark-submit --master local[16] --class "logreg"
target/scala-2.10/logistic-regression_2.10-1.0.2.jar  72

However, I do not observe any speedup compared to when I just use one
partition. I would much appreciate your help understanding what I am doing
wrong and why I am not seeing any speedup due to 16 cores. Please find my
code below.

Best,
Mike

*CODE*
object logreg {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("logreg")
    val sc = new SparkContext(conf)
    val npart=args(0).toInt;
    val data_ = MLUtils.loadLibSVMFile(sc,
"rcv1_train.binary.0label").cache()
    val data=data_.repartition(npart); // partition dataset in "npart"
partitions
    val lambda=(1.0/data.count())
    val splits = data.randomSplit(Array(1.0, 0.0), seed = 11L)
    val training = splits(0).map(x => (x.label,
MLUtils.appendBias(x.features))).cache()
    val numFeatures = data.take(1)(0).features.size
    val start = System.currentTimeMillis
    val initialWeightsWithIntercept = Vectors.dense(new
Array[Double](numFeatures + 1))
    val (weightsWithIntercept, loss) = LBFGS.runLBFGS(training,
                                                      new
LogisticGradient(),
                                                      new
SquaredL2Updater(),
                                                      10,
                                                      1e-14,
                                                      100,
                                                      lambda,
                                                     
initialWeightsWithIntercept)
    val took = (System.currentTimeMillis - start)/1000.0;
    println("LBFGS.runLBFGS: " +  took + "s")
    sc.stop()
  }
}





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-on-one-node-tp21052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message