At a quick glance, I think you're misunderstanding some basic features.

http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

Map is a transformation, it is lazy.  You're not calling any action on the result of map.

Also, closing over a mutable variable (like idx or featArray here) won't work; that closure is being run on executors, not the driver where your main code is running.

On Mon, Jan 12, 2015 at 9:49 AM, rkgurram <rkgurram@gmail.com> wrote:
Hi,
   I am observing some weird behavior with spark, it might be my
mis-interpretation of some fundamental concepts but I have look at it for 3
days and have not been able to solve it.

The source code is pretty long and complex so instead of posting it, I will
try to articulate the problem.
I am building a "Sentiment Analyser" using the Naive Bayes model in Spark.

1) I have taken text files in RAW format and created a RDD of
words->Array(files the word is found in).

 2) From this I have derived the "features" array for each file which is an
Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
word is found in the file

3) I have then created an RDD[LabeledPoints]

from this I have created the Naive Baiyes model using the following code

    val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training = splits(0)
   // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
    val test = splits(1)
    Logger.info("Training count: " + training.count() + " Testing count:" +
test.count())
    model = NaiveBayes.train(training, lambda = 1.0)

    val predictionAndLabel = test.map(p => (model.predict(p.features),
p.label))
    val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 ==
x._2).count() / test.count()
    Logger.info("Fold:[" + fold + "] accuracy: [" + accuracy +"]")

4) The model seems to be fine and the accuracy is about 75% to 82% depending
on which set of input fles I provide.

5) Now I am using this model to "predict()",  here I am creating the same
feature array from the input text file and I have code as follows,
   /*
    * Print all the features (words) in the feature array
    */
   allFeaturesRDD.foreach((x) => print(x + ", "))

 /*
  * Build the feature array
  */

    val features = buildFeatureArray(reviewID,wordSeqRdd) <---- Fails here,
have show this code below
    logFeatureArray(features)

    val prediction = model.predict(Vectors.dense(features))
    Logger.info ("Prediction:" + prediction)

==================================
reviewID ----> filename
wordReviewSeqRDD -> RDD[(word, Array(filename)]

  def buildFeatureArray(reviewID:String,
                        wordReviewSeqRDD:RDD[(String,Seq[String])]):
Array[Double] = {

    val numWords = allFeaturesRDD.count <--- number of all words in the
feature
    val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()

    var featArray:Array[Double] = new Array(numWords.toInt) <--- create an
empty features array
    var idx = 0
    if (trainingDone) Logger.info("Feature len:" + numWords)

    allFeaturesRDD.map{ *<-- This is where it is failing, *
      case(eachword) => { *<-- for some reason the code does not enter here
????*
        val reviewList = wordReviewSeqMap.get(eachword).get

        if (trainingDone == true) {
          println("1. eachword:" + eachword + "reviewList:" + reviewList)
          println("2. reviewList.size:" + reviewList.length)
          println("3. reviewList(0):" + reviewList(0))

        }

        featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
0.toDouble
        idx += 1
      }
    }
    featArray
  }



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org