spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (SPARK-23844) Socket Stream recovering from checkpoint will throw exception
Date Mon, 02 Apr 2018 09:10:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apache Spark reassigned SPARK-23844:
------------------------------------

    Assignee: Apache Spark

> Socket Stream recovering from checkpoint will throw exception
> -------------------------------------------------------------
>
>                 Key: SPARK-23844
>                 URL: https://issues.apache.org/jira/browse/SPARK-23844
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Saisai Shao
>            Assignee: Apache Spark
>            Priority: Major
>
> When we specified checkpoint location as well as using socket streaming, it will throw
exception after rerun:
> {noformat}
> 18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = c5ca82b2-550b-4c3d-9127-869f1aeae477,
runId = 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error
> java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat}
> Basically it means that {{TextSocketMicroBatchReader}} is honoring the offsets recovered
from checkpoint, this is not correct for socket source, as it doesn't support recovering from
checkpoint. Even though the offset is recovered, the real data is still unmatched from this
offset.
> To reproduce this issue,
> {code:java}
> val socket = spark.readStream.format("socket").options(Map("host" -> "localhost",
"port" -> "9999")).load
> spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint")
> socket.writeStream.format("parquet").option("path", "./result").queryName("test").start
> {code}
> This will be failed in the second run.
> Though this source is not supported from in production envs, I think still we should
make sure it can be worked in test env without throwing runtime exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message