mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reinis Vicups <mah...@orbit-x.de>
Subject Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Date Mon, 13 Oct 2014 17:17:47 GMT

 >Let me try this on my own cluster and see if I can reproduce. Are you 
using text files or have you gotten HBase working? How large is the input?

Using HBase as input (it's TFIDF-vector generated by seq2sparse) if I 
recall correctly about 30k documents with dimensionality of around 500k 
dimensions. I don't know how to get the table-size from HBase but the 
TFIDF-Vector hfile is bit over 20 MB


On 13.10.2014 19:11, Pat Ferrel wrote:
> But if it doesn’t work in the Spark config it probably won’t work here either. There
may be something else causing only one task for nongraph ABt.
>
> Dmitriy? Any ideas
>
> Let me try this on my own cluster and see if I can reproduce. Are you using text files
or have you gotten HBase working? How large is the input?
>
>
> On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <pat@occamsmachete.com> wrote:
>
> Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
>     sparkConf.set("spark.kryo.referenceTracking", "false")
>       .set("spark.kryoserializer.buffer.mb", "200")
>       .set("spark.executor.memory",
> parser.opts("sparkExecutorMem").asInstanceOf[String])
>       .set("spark.default.parallelism", 400)
>
> The multiply happens during lazy evaluation and is executed by the blas
> optimizer, when the result is required. This means that many failures will
> seem to happen in the write since this is the first place where the values
> are accessed in a non-lazy manner.
>
> Actually we should probably not be hard coding the
> spark.kryoserializer.buffer.mb either.
>
> On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <mahout@orbit-x.de> wrote:
>
>> Hello,
>>
>> When you set the Spark config as below do you still get one task?
>> Unfortunately yes.
>>
>> Currently I am looking for the very first shuffle stage in
>> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
>> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
>> I don't get where to look for the code of "%*%" in:
>>
>> // Compute row similarity cooccurrence matrix AA'
>> val drmAAt = drmA %*% drmA.t
>>
>> I would like to hard code partition number in that first shuffle just for
>> the sake of experiment.
>>
>>
>> On 13.10.2014 18:29, Pat Ferrel wrote:
>>
>>> I see no place where the spark.default.parallelism is set so your config
>>> can be set it to whatever you wish. When you set the Spark config as below
>>> do you still get one task? The test suite sets the
>>> spark.default.parallelism to 10 before the context is initialized. To do
>>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>>> modifying the driver) put the  .set("spark.default.parallelism", 400) in
>>> RowSimilarityDriver.start and see if that changes things.
>>>
>>> If this doesn’t work it may be that the blas optimizer is doing something
>>> with the value but I’m lost in that code There is only one place the value
>>> is read, which is in Par.scala
>>>
>>>          // auto adjustment, try to scale up to either x1Size or x2Size.
>>>          val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>>> "1").toInt
>>>
>>>          val x1Size = (clusterSize * .95).ceil.toInt
>>>          val x2Size = (clusterSize * 1.9).ceil.toInt
>>>
>>>          if (rdd.partitions.size <= x1Size)
>>>            rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>>          else if (rdd.partitions.size <= x2Size)
>>>            rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>>          else
>>>            rdd.coalesce(numPartitions = rdd.partitions.size)
>>>
>>>
>>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>>> how to increase it or how to get more than one task created when performing
>>> ABt?
>>>
>>>
>>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <mahout@orbit-x.de> wrote:
>>>
>>> Hi,
>>>
>>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>>> wondering, how could I increase number of tasks to use for distributed
>>> shuffle.
>>>
>>> What I currently observe, is that SimilarityAnalysis is requiring almost
>>> 20 minutes for my dataset only with this stage:
>>>
>>> combineByKey at ABt.scala:126
>>>
>>> When I view details for the stage I see that only one task is spawned
>>> running on one node.
>>>
>>> I have my own implementation of SimilarityAnalysis and by tuning number
>>> of tasks I have reached HUGE performance gains.
>>>
>>> Since I couldn't find how to pass the number of tasks to shuffle
>>> operations directly, I have set following in spark config
>>>
>>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>>         .set("spark.serializer", "org.apache.spark.serializer.
>>> KryoSerializer")
>>>         .set("spark.kryo.registrator", "org.apache.mahout.
>>> sparkbindings.io.MahoutKryoRegistrator")
>>>         .set("spark.kryo.referenceTracking", "false")
>>>         .set("spark.kryoserializer.buffer.mb", "200")
>>>         .set("spark.default.parallelism", 400) // <- this is the line
>>> supposed to set default parallelism to some high number
>>>
>>> Thank you for your help
>>> reinis
>>>
>>>


Mime
View raw message