spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sergunok <ser...@gmail.com>
Subject calculating TF-IDF for large 100GB dataset problems
Date Thu, 19 Mar 2015 12:16:58 GMT
Hi,

I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13
files - 100GB totally) located in HDFS .

This process already token about 20 hours on 3 node cluster with 6 cores,
20GB RAM on each node.

In my opinion it's to long :-)

I started the task with the following command:
spark-submit --master yarn --num-executors 9 --executor-memory 5GB
--excutor-cores=2 --driver-memory 5GB weight.py  

weight.py:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import Normalizer

conf = SparkConf() \
        .set("spark.hadoop.validateOutputSpecs", "false") \
        .set("spark.yarn.executor.memoryOverhead", "900")
sc = SparkContext(conf=conf)


# reading files from directory 'in/texts.txt' in HDFS
texts=sc.textFile('in/texts.txt') \
.map(lambda line: line.split())

hashingTF = HashingTF()
tf = hashingTF.transform(texts)

tf.cache()
idf = IDF(minDocFreq=100).fit(tf)
tfidf = idf.transform(tf)

n=Normalizer()

normalized=n.transform(tfidf)

def x2((vec, num)):
    triples=[]
    for id, weight in zip(vec.indices, vec.values):
            triples.append((num, id, weight))
    return triples

# I use zipWithIndex to enumerate documents
normalized.zipWithIndex() \
.flatMap(x2) \
.map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \
.saveAsTextFile('out/weights.txt')

1) What could be a bottleneck? 
Unfortunately I don't have access to the web UI.
In the log file I see stages: 0,1,2,3
Stage 0 "MapPartitionsRDD[6] at mapPartitionsWithIndex at
RDDFunctions.scala:108" with 584 tasks completed very quick
Stage 1 "MappedRDD[8] at values at RDDFunctions.scala:110" (23 tasks) -
quick too
Stage 2 "zipWithIndex" (584 tasks) was long (17 hours)
Stage 3 "saveAsTextFile" (584 tasks) - too (still executing about 2 hours)

I don't understand bounds of Stages 0,1..
And don't understand why I I see numbers like 584 or 23 tasks on stages.


2) On previous start of this task I saw a lot of "executor lost" errors of
yarn scheduler. Later I added .set("spark.yarn.executor.memoryOverhead",
"900") setting in code and now I see only a few such messages.  Could it be
a reason of poor performance?

Please advise!

Any explainations appreciated!

Serg.







--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message