spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mvsundaresan <mvsundare...@yahoo.com>
Subject KMeans with large clusters Java Heap Space
Date Fri, 30 Jan 2015 03:57:36 GMT
Trying to cluster small text msgs, using HashingTF and IDF with L2
Normalization. Data looks like this

id, msg
1, some text1
2, some more text2
3, sample text 3

Input data file size is 1.7 MB with 10 K rows. It runs (very slow took 3
hrs) for upto 20 clusters, but when I ask for 200 clusters getting Java Heap
Space error. Working with 3 nodes cluster with each 8 GB memory and 2 cores.
Played with different configuration, but no luck...

what am I missing any suggestions?

here is my code 

val sparkConf = new SparkConf().setMaster("spark://master:7077")
.setAppName("SparkKMeans")
.set("spark.executor.memory", "4192m")
.set("spark.storageLevel", "MEMORY_AND_DISK")
.set("spark.driver.memory", "4192m")
.set("spark.default.parallelism", "200")
.set("spark.storage.blockManagerHeartBeatMs", "60000")
.set("spark.akka.frameSize", "1000")

implicit val sc = new SparkContext(sparkConf)

val numClusters = 200
val numIterations = 20

val file = sc.textFile(".../file10k")
val lines = file.map(_.split("\001")).map(x => (x(1).toString,
x(15).toString))
val msgs = lines.map{case(val1, val2) =>
(val2).toString.replaceAll("[^a-zA-Z0-9]", " ").toLowerCase.split("
").toSeq}
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(msgs)
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
val l2normalizer = new Normalizer()
val data = tfidf.map(x => l2normalizer.transform(x))

val clusters = KMeans.train(data, numClusters, numIterations)

val WSSSE = clusters.computeCost(data)
val centtroids = clusters.clusterCenters map (_.toArray)

val result = clusters.predict(data)
val srcidx = result.zipWithIndex().map{case(val1, val2) => (val2, val1)}
val tktidx = tickets.zipWithIndex().map{case((val1, val2), val3) => (val3,
(val1, val2))}
val joined = srcidx.join(tktidx).map{case(val1, (val2, (val3, val4))) =>
(val1, val2, val3, val4)}
joined.saveAsTextFile(".../clustersoutput.txt")



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-with-large-clusters-Java-Heap-Space-tp21432.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message