mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pat Ferrel <>
Subject Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Date Mon, 13 Oct 2014 16:29:37 GMT
I see no place where the spark.default.parallelism is set so your config can be set it to whatever
you wish. When you set the Spark config as below do you still get one task? The test suite
sets the spark.default.parallelism to 10 before the context is initialized. To do this with
the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the driver) put the
 .set("spark.default.parallelism", 400) in RowSimilarityDriver.start and see if that changes

If this doesn’t work it may be that the blas optimizer is doing something with the value
but I’m lost in that code There is only one place the value is read, which is in Par.scala

        // auto adjustment, try to scale up to either x1Size or x2Size.
        val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt

        val x1Size = (clusterSize * .95).ceil.toInt
        val x2Size = (clusterSize * 1.9).ceil.toInt

        if (rdd.partitions.size <= x1Size)
          rdd.coalesce(numPartitions = x1Size, shuffle = true)
        else if (rdd.partitions.size <= x2Size)
          rdd.coalesce(numPartitions = x2Size, shuffle = true)
          rdd.coalesce(numPartitions = rdd.partitions.size)

Dmitriy can you shed any light on the use of spark.default.parallelism, how to increase it
or how to get more than one task created when performing ABt?

On Oct 13, 2014, at 8:56 AM, Reinis Vicups <> wrote:


I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how could I increase
number of tasks to use for distributed shuffle.

What I currently observe, is that SimilarityAnalysis is requiring almost 20 minutes for my
dataset only with this stage:

combineByKey at ABt.scala:126

When I view details for the stage I see that only one task is spawned running on one node.

I have my own implementation of SimilarityAnalysis and by tuning number of tasks I have reached
HUGE performance gains.

Since I couldn't find how to pass the number of tasks to shuffle operations directly, I have
set following in spark config

configuration = new SparkConf().setAppName(jobConfig.jobName)
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .set("spark.kryo.registrator", "")
       .set("spark.kryo.referenceTracking", "false")
       .set("spark.kryoserializer.buffer.mb", "200")
       .set("spark.default.parallelism", 400) // <- this is the line supposed to set default
parallelism to some high number

Thank you for your help

View raw message