spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hien Luu <hien...@gmail.com>
Subject Re: Writing custom Structured Streaming receiver
Date Sun, 04 Mar 2018 01:31:49 GMT
I finally got around to implement a custom structured streaming receiver
(source) to read Wikipedia edit events from the IRC server.

It works fines locally as well as in spark-shell on my laptop.  However, it
failed with the following exception when running in Databricks community
edition.

It seems like there is no way to create DataFrame with isStreaming as true
if a class resides outside of spark.sql package.  

I was looking at RateSourceProvider class that comes with Apache Spark and
it is using an internal function to create a DataFrame with isStreaming as
true - sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)

Does anyone have any suggestion? (other than waiting for DataSourceV2)

assertion failed: DataFrame returned by getBatch from <source class name>
did not have isStreaming=true

org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:674)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:669)
	at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669)
	at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62)
	at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:668)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:328)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316)
	at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:316)
	at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:312)
	at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message