spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jayesh lalwani (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter
Date Tue, 28 Feb 2017 03:33:45 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887183#comment-15887183
] 

Jayesh lalwani commented on SPARK-19738:
----------------------------------------

Thanks [~zsxwing] This will do. Do you know if PERMISSIVE mode will work for reading through
Kafka too? We use from_json method

BTW, I am using Spark release from Dec 15, and it doesn't support PERMISSIVE option. I am
going to try again after upgrading to latest. Is this a known issue?


> Consider adding error handler to DataStreamWriter
> -------------------------------------------------
>
>                 Key: SPARK-19738
>                 URL: https://issues.apache.org/jira/browse/SPARK-19738
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the applications stay
always On. However, right now, errors stop the driver. In some cases, this is not desirable
behavior. For example, I have the following application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception that propogates
to the driver, which promptly shuts down. The driver is running in supervised mode, and it
gets restarted. The application reads the same bad input and shuts down again. This goes ad-infinitum.
This behavior is desirable, in cases, the error is recoverable. For example, if the executor
cannot talk to the database, we want the application to keep trying the same input again and
again till the database recovers. However, for some cases, this behavior is undesirable. We
do not want this to happen when the input is bad. We want to put the bad record in some sort
of dead letter queue. Or maybe we want to kill the driver only when the number of errors have
crossed a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should call the error
handler and pass the Exception to the error handler. The error handler could eat the exception,
or transform it, or update counts in an accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message