Does this stacktrace look like a bug guys? Definitely seems like one to me.

Caused by: java.lang.StackOverflowError
	at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
	at scala.collection.immutable.List.foreach(List.scala:381)

On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <raviteja.lokineni@gmail.com> wrote:
Hi all,

I am not sure if this is a bug or not. Basically I am generating weekly aggregates of every column of data.

Adding source code here (also attached):
from pyspark.sql.window import Window
from pyspark.sql.functions import *

timeSeries = sqlContext.read.option("header", "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")

# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400

w = (Window()
.partitionBy("id")
.orderBy(col("dt").cast("timestamp").cast("long"))
.rangeBetween(-days(6), 0))

cols = ["id", "dt"]
skipCols = ["id", "dt"]

for col in timeSeries.columns:
if col in skipCols:
continue
cols.append(mean(col).over(w).alias("mean_7_"+col))
cols.append(count(col).over(w).alias("count_7_"+col))
cols.append(sum(col).over(w).alias("sum_7_"+col))
cols.append(min(col).over(w).alias("min_7_"+col))
cols.append(max(col).over(w).alias("max_7_"+col))

df = timeSeries.select(cols)
df.orderBy('id', 'dt').write\
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
.save("file:///tmp/spark-bug-out.csv")

Thanks,
--
Raviteja Lokineni | Business Intelligence Developer
TD Ameritrade





--
Raviteja Lokineni | Business Intelligence Developer
TD Ameritrade