spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qinwei <wei....@dewmobile.net>
Subject problem with patitioning
Date Sun, 28 Sep 2014 06:36:19 GMT






Hi, everyone    I come across a problem with changing the patition number of the rdd,  my
code is as below:    val rdd1 = sc.textFile(path1)     val rdd2 = sc.textFile(path2)

    val rdd3 = sc.textFile(path3)



    val imeiList = parseParam(job.jobParams)

    val broadcastVar = sc.broadcast(imeiList)

    val structuredRDD1 = rdd1.map(line => {           val trunks = line.split("\t")

                                                                                    if(trunks.length
== 35){

                                                                                         
  (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong)

                                                                                         
  }

                                                                                })

    val structuredRDD2 = rdd2.map(line => {           val trunks = line.split("\t")

                                                                                    if(trunks.length
== 33){

                                                                                         
  (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

                                                                                         
      }

                                                                                })

    val structuredRDD3 = rdd3.map(line => {          val trunks = line.split("\t")

                                                                                    if(trunks.length
== 33){

                                                                                         
  (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

                                                                                            
  }

                                                                        })

    val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3)

    val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
.map(arg => arg.asInstanceOf[(String, String, String, String, String, Long)])
.filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, broadcastVar.value,
0))
    val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", \"t_imei\" :
\"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" + arg._4 + "\", \"s\" : " +
arg._5.toString() + ", \"ts\" : " + arg._6.toString() + "}")

    val jsonArray = jsonStrRDD.collect
    I noticed that there are 3834 tasks by default,  and 3834 is the number of files in path1
and path2 and path3,  i want to change the number of patition by the code below:    val rdd1
= sc.textFile(path1, 1920) 
    val rdd2 = sc.textFile(path2, 1920) 
    val rdd3 = sc.textFile(path3, 1920)
    by doing this, i expect there are 1920 tasks totally, but i found the number of tasks
becomes 8920, any idea what's going on here?
    Thanks!



qinwei

Mime
View raw message