spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: Parallel execution on one node
Date Fri, 09 Jan 2015 11:55:08 GMT
The repartitioning has some overhead. Do your results show that
perhaps this is what is taking the extra time? You can try reading the
source file with more partitions instead of partitioning it after the
fact. See the minPartitions argument to loadLibSVMFile.

But before that, I'd assess whether the core computation here is even
taking much time. If it's already a small fraction of the total
runtime, speeding it up 16x doesn't do much.

On Fri, Jan 9, 2015 at 6:57 AM, mikens <msmelyan@gmail.com> wrote:
> Hello,
> I am new to Spark. I have adapted an example code to do binary
> classification using logistic regression. I tried it on rcv1_train.binary
> dataset using LBFGS.runLBFGS solver, and obtained correct loss.
>
> Now, I'd like to run code in parallel across 16 cores of my single CPU
> socket. If I understand correctly, parallelism in Spark is achieved by
> partitioning dataset into some number of partitions, approximately 3-4 times
> the amount of cores in the system.  To partition the data, I am calling
> data.repartition(npart), where npart is number of partitions (16*4=64 in my
> case).
>
> I run the code as follows:
> spark-submit --master local[16] --class "logreg"
> target/scala-2.10/logistic-regression_2.10-1.0.2.jar  72
>
> However, I do not observe any speedup compared to when I just use one
> partition. I would much appreciate your help understanding what I am doing
> wrong and why I am not seeing any speedup due to 16 cores. Please find my
> code below.
>
> Best,
> Mike
>
> *CODE*
> object logreg {
>   def main(args: Array[String]) {
>
>     val conf = new SparkConf().setAppName("logreg")
>     val sc = new SparkContext(conf)
>     val npart=args(0).toInt;
>     val data_ = MLUtils.loadLibSVMFile(sc,
> "rcv1_train.binary.0label").cache()
>     val data=data_.repartition(npart); // partition dataset in "npart"
> partitions
>     val lambda=(1.0/data.count())
>     val splits = data.randomSplit(Array(1.0, 0.0), seed = 11L)
>     val training = splits(0).map(x => (x.label,
> MLUtils.appendBias(x.features))).cache()
>     val numFeatures = data.take(1)(0).features.size
>     val start = System.currentTimeMillis
>     val initialWeightsWithIntercept = Vectors.dense(new
> Array[Double](numFeatures + 1))
>     val (weightsWithIntercept, loss) = LBFGS.runLBFGS(training,
>                                                       new
> LogisticGradient(),
>                                                       new
> SquaredL2Updater(),
>                                                       10,
>                                                       1e-14,
>                                                       100,
>                                                       lambda,
>
> initialWeightsWithIntercept)
>     val took = (System.currentTimeMillis - start)/1000.0;
>     println("LBFGS.runLBFGS: " +  took + "s")
>     sc.stop()
>   }
> }
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-on-one-node-tp21052.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message