mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Ferrel <...@occamsmachete.com>
Subject Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Date Mon, 13 Oct 2014 17:03:01 GMT
Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
    sparkConf.set("spark.kryo.referenceTracking", "false")
      .set("spark.kryoserializer.buffer.mb", "200")
      .set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
      .set("spark.default.parallelism", 400)

The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.

Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.

On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <mahout@orbit-x.de> wrote:

> Hello,
>
>  When you set the Spark config as below do you still get one task?
>>
>
> Unfortunately yes.
>
> Currently I am looking for the very first shuffle stage in
> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
> I don't get where to look for the code of "%*%" in:
>
> // Compute row similarity cooccurrence matrix AA'
> val drmAAt = drmA %*% drmA.t
>
> I would like to hard code partition number in that first shuffle just for
> the sake of experiment.
>
>
> On 13.10.2014 18:29, Pat Ferrel wrote:
>
>> I see no place where the spark.default.parallelism is set so your config
>> can be set it to whatever you wish. When you set the Spark config as below
>> do you still get one task? The test suite sets the
>> spark.default.parallelism to 10 before the context is initialized. To do
>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>> modifying the driver) put the  .set("spark.default.parallelism", 400) in
>> RowSimilarityDriver.start and see if that changes things.
>>
>> If this doesn’t work it may be that the blas optimizer is doing something
>> with the value but I’m lost in that code There is only one place the value
>> is read, which is in Par.scala
>>
>>          // auto adjustment, try to scale up to either x1Size or x2Size.
>>          val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>> "1").toInt
>>
>>          val x1Size = (clusterSize * .95).ceil.toInt
>>          val x2Size = (clusterSize * 1.9).ceil.toInt
>>
>>          if (rdd.partitions.size <= x1Size)
>>            rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>          else if (rdd.partitions.size <= x2Size)
>>            rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>          else
>>            rdd.coalesce(numPartitions = rdd.partitions.size)
>>
>>
>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>> how to increase it or how to get more than one task created when performing
>> ABt?
>>
>>
>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <mahout@orbit-x.de> wrote:
>>
>> Hi,
>>
>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>> wondering, how could I increase number of tasks to use for distributed
>> shuffle.
>>
>> What I currently observe, is that SimilarityAnalysis is requiring almost
>> 20 minutes for my dataset only with this stage:
>>
>> combineByKey at ABt.scala:126
>>
>> When I view details for the stage I see that only one task is spawned
>> running on one node.
>>
>> I have my own implementation of SimilarityAnalysis and by tuning number
>> of tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>         .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>>         .set("spark.kryo.registrator", "org.apache.mahout.
>> sparkbindings.io.MahoutKryoRegistrator")
>>         .set("spark.kryo.referenceTracking", "false")
>>         .set("spark.kryoserializer.buffer.mb", "200")
>>         .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>> reinis
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message