spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sriram Ramachandrasekaran <sri.ram...@gmail.com>
Subject Re: Data processing conventions with Spark.
Date Wed, 30 Oct 2013 13:55:40 GMT
Hey folks - I was reading up some documentation wrt RDDs and I see that
people've clarified that they indeed get spilled to disk. But,
unfortunately for me, it doesn't seem to be the case.
Makes me wonder, if I need to set up some configurations right for it.

Any ideas?


On Mon, Oct 28, 2013 at 11:47 PM, Sriram Ramachandrasekaran <
sri.rams85@gmail.com> wrote:

> Hello,
>
> I'm trying to process text documents and build a context vector for each
> term/feature in the corpus. Context vector is vector of features that are
> "around" a term in the corpus within a distance of x.
>
> Now, when I run this code, the process suffers a GC overhead error and I
> get an OOM when it builds this featureContextVector.
>
> Some configurations for perusal.
> Cluster type: Stand alone cluster
> Number of nodes: 1(master and slave same node)
> Job memory: tried with default - 512m, 1G and 2G as well.
> Data size: number of docs size is 12.8M, each document being 50-100 chars
> long - 1.2G raw data size not considering any JVM overhead.
>
> So, while it looks reasonable that, the job definitely needs more memory,
> I was wondering why the records in RDD doesn't spill over to disk so as to
> reduce the memory pressure. I also tried changing StorageLevel of the
> parent RDD(in this case the docs RDD - see below) type to DISK_ONLY but to
> no effect.
>
> 1. I want to know if this is an expected behavior and if so why.
> 2. And, when does RDD spill over to disk kick-in? Is it something that we
> should enable while constructing the RDD or SparkContext? Kindly clarify.
>
> 3. On a slightly unrelated note, I also want to know if there's an elegant
> way to create incremental document Ids when processing documents using
> spark. The problem I face is, when I iterate over an RDD, the processing
> might get distributed over nodes. So, I can't have an id that's unique and
> auto incr across these nodes. I tried .collect().zipWithIndex(), but, that
> has the limitation of storing data in memory, which is not desirable when
> doing large scale processing. Am I missing something?
>
>
> Below is the snippet that does the job...
>
> Some type/glossary here:
> *docs : RDD[Document], Document - case class to hold document with some
> metadata*
> *list2FreqMap - transforms a list[T] to Map[T, Int] where, Int value is
> the number of times the key has occurred in the list.*
>
>
> === Snippet ===
> val featureContextVector = docs.flatMap{ doc =>
>       val terms = doc.tokenize.toList
>       val context  = terms.zipWithIndex.map{ case (term, index) => term ->
> (terms, index) }
>       context
>     }.groupBy(_._1)
>     .map{ kv =>
>       (kv._1, kv._2.map(_._2))
>     }.map{ kv =>
>       // contexts are generated based on term, its index in the document
> and the window size. we pick "window" items "around" the current position.
>       val (term, termsIndexPairs) = kv
>       val contextList = termsIndexPairs.flatMap{ case (terms, index) =>
>         terms.slice(indexWithinBounds(index-windowSize, terms.size),
> index) ++ terms.slice(index+1, indexWithinBounds(index+windowSize,
> terms.size)+1)
>       }
>       val contextVectorForTerm = list2FreqMap(contextList).toIterable
>       term -> contextVectorForTerm
>     }
>
>
>
> --
> It's just about how deep your longing is!
>



-- 
It's just about how deep your longing is!

Mime
View raw message