spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpapathanasiou <denis.papathanas...@gmail.com>
Subject How to use an anonymous function with DataFrame.explode() ?
Date Sat, 07 Jan 2017 18:39:54 GMT
I need to take a DataFrame of events, and explode them row-wise so that
there's at least one representation per time interval (usually day) in
between events.

Here's a simplified version of the problem, which I have gotten to work in
spark-shell:

case class Meal (food: String, calories: Double, date: Int)
case class AsOfDate (dt: Int)

val m1 = new Meal("steak", 33, 20170101)
val m2 = new Meal("peach", 25, 20170105)

val mDf = sc.parallelize(Seq(m1, m2)).toDF

scala> mDf.show

+-----+--------+--------+
| food|calories|    date| 
+-----+--------+--------+
|steak|      33|20170101| 
|peach|      25|20170105| 
+-----+--------+--------+

Now, I want to explode the DataFrame so that there are no gaps in days:

mDf.where($"date" < 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170104).map(AsOfDate(_))
}

scala> res0.show
 
+-----+--------+--------+
| food|calories|    date| 
+-----+--------+--------+
|steak|      33|20170101| 
|steak|      33|20170102| 
|steak|      33|20170103| 
|steak|      33|20170104| 
+-----+--------+--------+

mDf.where($"date" >= 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170105).map(AsOfDate(_))
}

scala> res1.show
 
+-----+--------+--------+
| food|calories|    date| 
+-----+--------+--------+
|peach|      25|20170105| 
+-----+--------+--------+

val exploded = res0.union(res1)

scala> exploded.show

+-----+--------+--------+
| food|calories|    date| 
+-----+--------+--------+
|steak|      33|20170101| 
|steak|      33|20170102| 
|steak|      33|20170103| 
|steak|      33|20170104| 
|peach|      25|20170105| 
+-----+--------+--------+

So that gives me what I want, but I'd like to be able to define the function
that does the date iteration elsewhere, and pass it in to the call to
explode().

Part of the the reason is that in the real DataFrame "date" is a
java.sql.Timestamp, which involves more manipulation to add the right number
fo time intervals.

I've tried defining the statements like this, but I cannot get it to work:

val fn1 = (x: Row) => { case Row(date: Int) => (date to
20170104).map(AsOfDate(_)) }

but I get this:

error: missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: ?

So I tried it this way instead:

val fn1 = (x: Row) => x match {
  case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) 
}

which seemed to work, until I attempted to use it with explode():

mDf.where($"date" < 20170105).explode(mDf("date"))fn1

error: missing argument list for method explode in class Dataset

I don't see how I can pass fn1 to explode given that it expects a type Row
as its only input.

Is what I want to do possible, or do I need to write the Row manipulation
functions with explode() as I've done in the working examples above?

As a side question, I also wonder why I need the AsOfDate case class to do
the explosion, since all I want to do is set the column value, where I
already know the type.

Before using AsOfDate, I tried one of the working examples like this:

mDf.where($"date" < 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170104).map(_.asInstanceOf[Int])
}

but that gave me this:

error: inferred type arguments [Int] do not conform to method explode's type
parameter bounds [A <: Product]

I'm not sure what that means, or how using the case class resolves it.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-an-anonymous-function-with-DataFrame-explode-tp28285.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message