Hi Frank,

Two suggestions 

1. I would recommend caching the corpus prior to running LDA

2. If you are using EM I would tweak the sample size using the setMiniBatchFraction parameter to decrease the sample per iteration.

-Richard

On Tue, Sep 20, 2016 at 10:27 AM, Frank Zhang <dataminingus@yahoo.com.invalid> wrote:
Hi Yuhao,

   Thank you so much for your great contribution to the LDA and other Spark modules!

    I use both Spark 1.6.2 and 2.0.0. The data I used originally is very large which has tens of millions of documents. But for test purpose, the data set I mentioned earlier ("/data/mllib/sample_lda_data.txt") is good enough.  Please change the path to where you install your Spark to point to the data set and run those lines:

import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors
//please change the path for the data set below:
val data = sc.textFile("/data/mllib/sample_lda_data.txt") 
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
val ldaModel = new LDA().setK(3).run(corpus)
 
   It should work. After that, please run:
val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus)

   When I ran it, at job #90, that iteration took relatively extremely long then it stopped with exception:

Active Jobs (1)

Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total
90fold at LDAOptimizer.scala:2262016/09/20 10:18:3022 s0/269
0/538

Completed Jobs (90)

Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total
89fold at LDAOptimizer.scala:2262016/09/20 10:18:3043 ms4/4 (262 skipped)
8/8 (524 skipped)
88fold at LDAOptimizer.scala:2262016/09/20 10:18:3040 ms4/4 (259 skipped)
8/8 (518 skipped)
87fold at LDAOptimizer.scala:2262016/09/20 10:18:2980 ms4/4 (256 skipped)
8/8 (512 skipped)
86fold at LDAOptimizer.scala:2262016/09/20 10:18:2941 ms4/4 (253 skipped)
8/8 (506 skipped)
   Part of the error message:
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
  at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.fold(RDD.scala:1040)
  at org.apache.spark.mllib.clustering.EMLDAOptimizer.computeGlobalTopicTotals(LDAOptimizer.scala:226)
  at org.apache.spark.mllib.clustering.EMLDAOptimizer.next(LDAOptimizer.scala:213)
  at org.apache.spark.mllib.clustering.EMLDAOptimizer.next(LDAOptimizer.scala:79)
  at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:334)
  ... 48 elided
Caused by: java.lang.StackOverflowError
  at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
  at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

   Thank you so much!

   Frank 




From: "Yang, Yuhao" <yuhao.yang@intel.com>
To: Frank Zhang <dataminingus@yahoo.com>; "user@spark.apache.org" <user@spark.apache.org>
Sent: Tuesday, September 20, 2016 9:49 AM
Subject: RE: LDA and Maximum Iterations

Hi Frank,
 
Which version of Spark are you using? Also can you share more information about the exception.
 
If it’s not confidential, you can send the data sample to me (yuhao.yang@intel.com) and I can try to investigate.
 
Regards,
Yuhao
From: Frank Zhang [mailto:dataminingus@yahoo.com.INVALID]
Sent: Monday, September 19, 2016 9:20 PM
To: user@spark.apache.org
Subject: LDA and Maximum Iterations
 
Hi all,
 
   I have a question about parameter setting for LDA model. When I tried to set a large number like 500 for  
setMaxIterations, the program always fails.  There is a very straightforward LDA tutorial using an example data set in the mllib package:http://stackoverflow.com/questions/36631991/latent-dirichlet-allocation-lda-algorithm-not-printing-results-in-spark-scala.  The codes are here:
 
import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("/data/mllib/sample_lda_data.txt") // you might need to change the path for the data set
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
 
But if I change the last line to 
val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus), the program fails.  
 
    I greatly appreciate your help! 
 
Best,
 
    Frank
 
 
 





--

Richard L Garris

Solution Architect 

Databricks, Inc.

richard@databricks.com

Mobile: 650.200.0840

databricks.com