spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: Can't transform RDD for the second time
Date Wed, 01 Mar 2017 05:53:15 GMT
foreachPartition is not a transformation; it is an action. If you want to
transform an RDD using an iterator in each partition, then use
mapPartitions.

On Tue, Feb 28, 2017 at 8:17 PM, jeremycod <zoran.jeremic@gmail.com> wrote:

> Hi,
>
> I'm trying to transform one RDD two times. I'm using foreachParition and
> embedded I have two map transformations on it. First time, it works fine
> and
> I get results, but second time I call map on it, it behaves like RDD has no
> elements.
> This is my code:
>
>     val credentialsIdsScala: Seq[java.lang.Long] =
> credentialsIds.asScala.toSeq
>     println("ALL CREDENTIALS:" + credentialsIdsScala.mkString(","))
>
>
>     val credentialsRDD: RDD[Long] = sc.parallelize(credentialsIdsScala.map
> {
> Long2long })
>     val connector = CassandraConnector(sc.getConf)
>     credentialsRDD.foreachPartition {
>       credentials => {
>         val userCourseKMeansProfiles: Iterator[Iterable[Tuple5[Long,
> String,
> Long, Long, String]]] = credentials.map { credentialid =>
>           println("RUNNING USER PROFILE CLUSTERING FOR CREDENTIAL:" +
> credentialid)
>           val userCourseProfile: Iterable[Tuple5[Long, String, Long, Long,
> String]] = runPeriodicalKMeansClustering(dbName, days, numClusters,
> numFeatures, credentialid)
>           userCourseProfile
>         }
>         userCourseKMeansProfiles.foreach(userProfile => {
>           val query = "INSERT INTO " + dbName + "." +
> TablesNames.PROFILE_USERQUARTILE_FEATURES_BYPROFILE + "(course,
> profile,date, userid, sequence) VALUES (?, ?, ?,?,?) ";
>           connector.withSessionDo {
>             session => {
>               userProfile.foreach(record => {
>                 println("USER PROFILE RECORD:" + record._1 + " " +
> record._2
> + " " + record._3 + " " + record._4 + " " + record._5)
>                 session.execute(query,
> record._1.asInstanceOf[java.lang.Long], record._2.asInstanceOf[String],
> record._3.asInstanceOf[java.lang.Long],
> record._4.asInstanceOf[java.lang.Long], record._5.asInstanceOf[String])
>               })
>             }
>           }
>         })
>         val secondMapping = credentials.map {
>           credentialid =>
>             println("credential id:" + credentialid)
>             credentialid
>         }
>         secondMapping.foreach(cid=>println("credentialid:"+cid))
>         println("Second mapping:" + secondMapping.length)
>       }
>
> Could someone explain me what is wrong with my code and how to fix it?
>
> Thanks,
> Zoran
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Can-t-transform-RDD-for-the-second-time-tp28441.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Mime
View raw message