spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sing, Jasbir" <jasbir.s...@accenture.com>
Subject RE: [External] Re: Sorting in Spark on multiple partitions
Date Wed, 06 Jun 2018 17:27:44 GMT
Hi Jorn,

We are using Spark 2.2.0 for our development.
Below is the code snippet for your reference:

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
newDf.write.format("parquet").saveAsTable("tempData")
newDf.coalesce(1).write.format(outputFormat).option("header", "true").save(hdfsUri + destFilePath)

var groupedData = newDf.rdd.map { x => (x.get(0),x)}.groupByKey();

     //Get schema fields of dataframe
     var structFieldArray:Array[StructField] = newDf.schema.fields
     //Create Map for storing Dataframe's columnName,ColumnNumber and their dataType
     var i=0
     val cache = collection.mutable.Map[String, DataFrameBO]()
     for(structField<-structFieldArray)
     {
          val dataFrameBO = new DataFrameBO(i,structField.name,structField.dataType.typeName)
          cache.put(structField.name, dataFrameBO)
          i = i + 1
     }
     var dfWithoutDuplicateRows = groupedData.mapValues { x => {
              var ls:List[Row]=List()
              var linkedMap = collection.mutable.Map[String, String]()
              val linkedSid =ArrayBuffer.empty[String]


              x.foreach { y => {
                             var subpathId = y(cache(sid).columnNumber)
                             var salesTimeColumn = y(cache(time).columnNumber)
                             var orderId = y(cache(orderIdColumnName).columnNumber)

                             var seq = ArrayBuffer[Any]()
                             for(i <- 0 to (y.size - 2)){
                                  seq += y(i)
                             }

                                                                          if(!linkedSid.contains(y(cache(sid).columnNumber)))
                              {
                                                                              if(linkedMap.exists(x
=> x._1.equals(y(cache(time).columnNumber)) && x._2.equals(y(cache(orderIdColumnName).columnNumber))))
                                  {
                                     seq += 0 // Appends 0 to rows which needs to be deleted.
                                  }
                                  else
                                  {
                                      linkedSid += y(cache(sid).columnNumber).toString()
                                      linkedMap.put(y(cache(time).columnNumber).toString(),y(cache(orderIdColumnName).columnNumber).toString())
                                      seq += 1  // Appends 1 to rows which need not be deleted.
                                  }
                               }
                              else
                              {
                                   seq += 0 // Appends 0 to rows which needs to be deleted.
                              }

                 ls::= Row.fromSeq(seq)
              }}
         ls
      }}

      var flatDataframe = dfWithoutDuplicateRows.values.flatMap { x => {x} }
      var finalDF = data.sqlContext.createDataFrame(flatDataframe, newDf.schema)

finalDF should have picked up data on first cum first basis and updated the flag accordingly.
Please let me know if you need any other information regarding the same.

From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Monday, June 4, 2018 10:59 PM
To: Jain, Neha T. <neha.t.jain@accenture.com<mailto:neha.t.jain@accenture.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal <payal.patel@accenture.com<mailto:payal.patel@accenture.com>>;
Sing, Jasbir <jasbir.sing@accenture.com<mailto:jasbir.sing@accenture.com>>
Subject: Re: [External] Re: Sorting in Spark on multiple partitions

I think also there is a misunderstanding how repartition works. It keeps the existing number
of partitions, but hash partitions according to userid. Means in each partition it is likely
to have different user ids.

That would also explain your observed behavior. However without having the full source code
these are just assumptions.

On 4. Jun 2018, at 17:33, Jain, Neha T. <neha.t.jain@accenture.com<mailto:neha.t.jain@accenture.com>>
wrote:
Hi Jorn,

I tried removing userid from my sort clause but still the same issue- data not sorted.

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)

I am checking the sorting results  by temporary writing this file to Hive as well as HDFS.
Now, when I see the user wise data it is not sorted.
Attaching the output file for your reference.

On the basis of sorting within userid partitions, I want to add a flag which marks first item
in the partition as true other items in that partition as false.
If my sorting order is disturbed, the flag is wrongly set.

Please suggest what else could be done to fix this very basic scenario of sorting in Spark
across multiple partitions across multiple nodes.

Thanks & Regards,
Neha Jain

From: Jörn Franke [mailto:jornfranke@gmail.com]
Sent: Monday, June 4, 2018 10:48 AM
To: Sing, Jasbir <jasbir.sing@accenture.com<mailto:jasbir.sing@accenture.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal <payal.patel@accenture.com<mailto:payal.patel@accenture.com>>;
Jain, Neha T. <neha.t.jain@accenture.com<mailto:neha.t.jain@accenture.com>>
Subject: [External] Re: Sorting in Spark on multiple partitions

You partition by userid, why do you then sort again by userid in the partition? Can you try
to remove userid from the sort?

How do you check if the sort is correct or not?

What is the underlying objective of the sort? Do you have more information on schema and data?

On 4. Jun 2018, at 05:47, Sing, Jasbir <jasbir.sing@accenture.com<mailto:jasbir.sing@accenture.com>>
wrote:
Hi Team,

We are currently using Spark 2.2.0 and facing some challenges in sorting of data on multiple
partitions.
We have tried below approaches:


  1.  Spark SQL approach:
a.      var query = "select * from data distribute by " + userid + " sort by " + userid +
", " + time “


This query returns correct results in Hive but not in Spark SQL.

  1.  var newDf = data.repartition(col(userud)).orderBy(userid, time)
  2.  var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)


But none of the above approach is giving correct results for sorting of data.
Please suggest what could be done for the same.

Thanks & Regards,
Neha Jain

________________________________

This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Where allowed by local law, electronic communications with Accenture and its affiliates, including
e-mail and instant messaging (including content), may be scanned by our systems for the purposes
of information security and assessment of internal compliance with Accenture policy. Your
privacy is important to us. Accenture uses your personal data only in compliance with data
protection laws. For further information on how Accenture processes your personal data, please
see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>
<test.csv>
Mime
View raw message