spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varun sharma <varunsharman...@gmail.com>
Subject Re: Issue in spark batches
Date Wed, 21 Oct 2015 06:28:42 GMT
Hi TD,
Is there any way in spark  I can fail/retry batch in case of any exceptions
or do I have to write code to explicitly keep on retrying?
Also If some batch fail, I want to block further batches to be processed as
it would create inconsistency in updation of zookeeper offsets and maybe
kill the job itself after lets say 3 retries.

Any pointers to achieve same are appreciated.

On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <tdas@databricks.com> wrote:

> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
> actually completing with exception, the UI does not update correctly.
>
> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharmansit@gmail.com>
> wrote:
>
>> Also, As you can see the timestamps in attached image. batches coming
>> after the Cassandra server comes up(21:04) are processed and batches which
>> are in hung state(21:03) never get processed.
>> So, How do I fail those batches so that those can be processed again.
>>
>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharmansit@gmail.com>
>> wrote:
>>
>>> Hi TD,
>>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>>> if i catch any exceptions?.
>>> Right now that batch doesn't fail and remain in hung state. Is there any
>>> way I fail that batch so that it can be tried again.
>>>
>>> Thanks
>>> Varun
>>>
>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>>> does, you can catch that exception and write your own logic to retry and/or
>>>> no update. Once the foreachRDD function completes, that batch will be
>>>> internally marked as completed.
>>>>
>>>> TD
>>>>
>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
>>>> varunsharmansit@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>>> job.*
>>>>> *Spark 1.4.0*
>>>>> *cassandra connector 1.4.0-M3*
>>>>> *Issue is:*
>>>>>
>>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>>> offsets in *zookeeper*.
>>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>>> arrives in that time window is not processed ever though the offsets
are
>>>>> updated in zookeeper.
>>>>> It is resulting data loss.
>>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>>> PFA the screenshots of hung batches and code.
>>>>>
>>>>> *Code:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>>
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>> })
>>>>>
>>>>> *I have even tried this variant:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns)
>>>>> })
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>
>>>>> }
>>>>>
>>>>> Exception when cassandra cluster is down:
>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>>> streaming job 1445239140000 ms.3
>>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>>> {......}
>>>>>
>>>>> --
>>>>> *VARUN SHARMA*
>>>>> *Flipkart*
>>>>> *Bangalore*
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Mime
View raw message