spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sriram Ramachandrasekaran <sri.ram...@gmail.com>
Subject Data processing conventions with Spark.
Date Mon, 28 Oct 2013 18:17:27 GMT
Hello,

I'm trying to process text documents and build a context vector for each
term/feature in the corpus. Context vector is vector of features that are
"around" a term in the corpus within a distance of x.

Now, when I run this code, the process suffers a GC overhead error and I
get an OOM when it builds this featureContextVector.

Some configurations for perusal.
Cluster type: Stand alone cluster
Number of nodes: 1(master and slave same node)
Job memory: tried with default - 512m, 1G and 2G as well.
Data size: number of docs size is 12.8M, each document being 50-100 chars
long - 1.2G raw data size not considering any JVM overhead.

So, while it looks reasonable that, the job definitely needs more memory, I
was wondering why the records in RDD doesn't spill over to disk so as to
reduce the memory pressure. I also tried changing StorageLevel of the
parent RDD(in this case the docs RDD - see below) type to DISK_ONLY but to
no effect.

1. I want to know if this is an expected behavior and if so why.
2. And, when does RDD spill over to disk kick-in? Is it something that we
should enable while constructing the RDD or SparkContext? Kindly clarify.

3. On a slightly unrelated note, I also want to know if there's an elegant
way to create incremental document Ids when processing documents using
spark. The problem I face is, when I iterate over an RDD, the processing
might get distributed over nodes. So, I can't have an id that's unique and
auto incr across these nodes. I tried .collect().zipWithIndex(), but, that
has the limitation of storing data in memory, which is not desirable when
doing large scale processing. Am I missing something?


Below is the snippet that does the job...

Some type/glossary here:
*docs : RDD[Document], Document - case class to hold document with some
metadata*
*list2FreqMap - transforms a list[T] to Map[T, Int] where, Int value is the
number of times the key has occurred in the list.*


=== Snippet ===
val featureContextVector = docs.flatMap{ doc =>
      val terms = doc.tokenize.toList
      val context  = terms.zipWithIndex.map{ case (term, index) => term ->
(terms, index) }
      context
    }.groupBy(_._1)
    .map{ kv =>
      (kv._1, kv._2.map(_._2))
    }.map{ kv =>
      // contexts are generated based on term, its index in the document
and the window size. we pick "window" items "around" the current position.
      val (term, termsIndexPairs) = kv
      val contextList = termsIndexPairs.flatMap{ case (terms, index) =>
        terms.slice(indexWithinBounds(index-windowSize, terms.size), index)
++ terms.slice(index+1, indexWithinBounds(index+windowSize, terms.size)+1)
      }
      val contextVectorForTerm = list2FreqMap(contextList).toIterable
      term -> contextVectorForTerm
    }



-- 
It's just about how deep your longing is!

Mime
View raw message