spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ruslan Dautkhanov (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-23074) Dataframe-ified zipwithindex
Date Tue, 01 May 2018 20:57:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460137#comment-16460137
] 

Ruslan Dautkhanov edited comment on SPARK-23074 at 5/1/18 8:56 PM:
-------------------------------------------------------------------

That's . When we have a "record" that spans multiple text lines, and it happens that some
lines are in one partitions, and rest of lines are in another partition.. what would monotonically_increasing_id()
return? -It wouldn't be consequential, right?- I just fond answer to this question in the
[code |https://github.com/apache/spark/blob/7c1654e2159662e7e663ba141719d755002f770a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala#L27]. 

See [https://stackoverflow.com/a/48454000/470583] - people are creating quite expensive
workaround 
{code}
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id",
monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id")
+ lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray

     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId),
LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}
{code}
Do you see an easier way to do this? Thanks!
  


was (Author: tagar):
That's . When we have a "record" that spans multiple text lines, and it happens that some
lines are in one partitions, and rest of lines are in another partition.. what would monotonically_increasing_id()
return? It wouldn't be consequential, right? 

See [https://stackoverflow.com/a/48454000/470583] - people are creating quite expensive
workaround 

{code:scala}
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id",
monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id")
+ lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray

     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId),
LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}
{code}

Do you see an easier way to do this? Thanks!
 

> Dataframe-ified zipwithindex
> ----------------------------
>
>                 Key: SPARK-23074
>                 URL: https://issues.apache.org/jira/browse/SPARK-23074
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Ruslan Dautkhanov
>            Priority: Minor
>              Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
>     df.rdd.zipWithIndex.map(ln =>
>       Row.fromSeq(
>         (if (inFront) Seq(ln._2 + offset) else Seq())
>           ++ ln._1.toSeq ++
>         (if (inFront) Seq() else Seq(ln._2 + offset))
>       )
>     ),
>     StructType(
>       (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())

>         ++ df.schema.fields ++ 
>       (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
>     )
>   ) 
> }
> {code}
> credits: [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message