spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shushant Arora <>
Subject Re: Upgrade of Spark-Streaming application
Date Thu, 06 Aug 2015 03:35:34 GMT

For checkpointing and using fromOffsets  arguments- Say for the first time
when my app starts I don't have any prev state stored and I want to start
consuming from largest offset

1.  is it possible to specify that in fromOffsets api- I don't want to use
another api which returs JavaPairInputDStream but fromoffsets api
returns JavaDStream - since I want to keep further flow of my app same in
both case.

2. So to achieve first(same flow in both cases) if I  use diff api in 2
cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
function , I am no longer able to typecast transferred stream to
HasOffsetRanges for getting offstes of current run- it throws class cast
exception -
when i do
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
on transformed stream -

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <> wrote:

> You can't use checkpoints across code upgrades.  That may or may not
> change in the future, but for now that's a limitation of spark checkpoints
> (regardless of whether you're using Kafka).
> Some options:
> - Start up the new job on a different cluster, then kill the old job once
> it's caught up to where the new job started.  If you care about duplicate
> work, you should be doing idempotent / transactional writes anyway, which
> should take care of the overlap between the two.  If you're doing batches,
> you may need to be a little more careful about handling batch boundaries
> - Store the offsets somewhere other than the checkpoint, and provide them
> on startup using the fromOffsets argument to createDirectStream
> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <>
> wrote:
>> Hi,
>> I've read about the recent updates about spark-streaming integration with
>> Kafka (I refer to the new approach without receivers).
>> In the new approach, metadata are persisted in checkpoint folders on HDFS
>> so that the SparkStreaming context can be recreated in case of failures.
>> This means that the streaming application will restart from the where it
>> exited and the message consuming process continues with new messages only.
>> Also, if I manually stop the streaming process and recreate the context
>> from checkpoint (using an approach similar to
>> the behavior would be the same.
>> Now, suppose I want to change something in the software and modify the
>> processing pipeline.
>> Can spark use the previous checkpoint to recreate the new application?
>> Will I ever be able to upgrade the software without processing all the
>> messages in Kafka again?
>> Regards,
>> Nicola

View raw message