At a quick glance, I think you're misunderstanding some basic features.

Map is a transformation, it is lazy.  You're not calling any action on the result of map.

Also, closing over a mutable variable (like idx or featArray here) won't work; that closure is being run on executors, not the driver where your main code is running.

On Mon, Jan 12, 2015 at 9:49 AM, rkgurram <> wrote:
   I am observing some weird behavior with spark, it might be my
mis-interpretation of some fundamental concepts but I have look at it for 3
days and have not been able to solve it.

The source code is pretty long and complex so instead of posting it, I will
try to articulate the problem.
I am building a "Sentiment Analyser" using the Naive Bayes model in Spark.

1) I have taken text files in RAW format and created a RDD of
words->Array(files the word is found in).

 2) From this I have derived the "features" array for each file which is an
Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
word is found in the file

3) I have then created an RDD[LabeledPoints]

from this I have created the Naive Baiyes model using the following code

    val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training = splits(0)
   // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
    val test = splits(1)"Training count: " + training.count() + " Testing count:" +
    model = NaiveBayes.train(training, lambda = 1.0)

    val predictionAndLabel = => (model.predict(p.features),
    val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 ==
x._2).count() / test.count()"Fold:[" + fold + "] accuracy: [" + accuracy +"]")

4) The model seems to be fine and the accuracy is about 75% to 82% depending
on which set of input fles I provide.

5) Now I am using this model to "predict()",  here I am creating the same
feature array from the input text file and I have code as follows,
    * Print all the features (words) in the feature array
   allFeaturesRDD.foreach((x) => print(x + ", "))

  * Build the feature array

    val features = buildFeatureArray(reviewID,wordSeqRdd) <---- Fails here,
have show this code below

    val prediction = model.predict(Vectors.dense(features)) ("Prediction:" + prediction)

reviewID ----> filename
wordReviewSeqRDD -> RDD[(word, Array(filename)]

  def buildFeatureArray(reviewID:String,
Array[Double] = {

    val numWords = allFeaturesRDD.count <--- number of all words in the
    val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()

    var featArray:Array[Double] = new Array(numWords.toInt) <--- create an
empty features array
    var idx = 0
    if (trainingDone)"Feature len:" + numWords){ *<-- This is where it is failing, *
      case(eachword) => { *<-- for some reason the code does not enter here
        val reviewList = wordReviewSeqMap.get(eachword).get

        if (trainingDone == true) {
          println("1. eachword:" + eachword + "reviewList:" + reviewList)
          println("2. reviewList.size:" + reviewList.length)
          println("3. reviewList(0):" + reviewList(0))


        featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
        idx += 1

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail: