spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Smith <>
Subject Re: Low Level Kafka Consumer for Spark
Date Mon, 08 Sep 2014 23:32:42 GMT
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.

On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das <>

> Some thoughts on this thread to clarify the doubts.
> 1. Driver recovery: The current (1.1 to be released) does not recover the
> raw data that has been received but not processes. This is because when the
> driver dies, the executors die and so does the raw data that was stored in
> it. Only for HDFS, the data is not lost by driver recovery as the data is
> already present reliably in HDFS. This is something we want to fix by Spark
> 1.2 (3 month from now). Regarding recovery by replaying the data from
> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
> exactly-once semantics in all transformations. To guarantee this for all
> kinds of streaming computations stateful and not-stateful computations, it
> is requires that the data be replayed through Kafka in exactly same order,
> and the underlying blocks of data in Spark be regenerated in the exact way
> as it would have if there was no driver failure. This is quite tricky to
> implement, requires manipulation of zookeeper offsets, etc, that is hard to
> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
> Kafka receiver may enable such approaches in the future. For now we
> definitely plan to solve the first problem very very soon.
> 3. Repartitioning: I am trying to understand the repartition issue. One
> common mistake I have seen is that developers repartition a stream but not
> use the repartitioned stream.
> inputDstream.repartition(100)
> val repartitionedDStream = inputDStream.repartitoin(100)
> Not sure if this helps solve the problem that you all the facing. I am
> going to add this to the stremaing programming guide to make sure this
> common mistake is avoided.
> TD
> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
>> wrote:
>> Hi,
>> Sorry for little delay . As discussed in this thread, I have modified the
>> Kafka-Spark-Consumer (
>> code to have dedicated Receiver for every Topic Partition. You can see the
>> example howto create Union of these receivers
>> in .
>> Thanks to Chris for suggesting this change.
>> Regards,
>> Dibyendu
>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <>
>> wrote:
>>> Just a comment on the recovery part.
>>> Is it correct to say that currently Spark Streaming recovery design does
>>> not
>>> consider re-computations (upon metadata lineage recovery) that depend on
>>> blocks of data of the received stream?
>>> Just to illustrate a real use case (mine):
>>> - We have object states which have a Duration field per state which is
>>> incremented on every batch interval. Also this object state is reset to 0
>>> upon incoming state changing events. Let's supposed there is at least one
>>> event since the last data checkpoint. This will lead to inconsistency
>>> upon
>>> driver recovery: The Duration field will get incremented from the data
>>> checkpoint version until the recovery moment, but the state change event
>>> will never be in the end we have the old state with the
>>> wrong Duration value.
>>> To make things worst, let's imagine we're dumping the Duration increases
>>> somewhere...which means we're spreading the problem across our system.
>>> Re-computation awareness is something I've commented on another thread
>>> and
>>> rather treat it separately.
>>> Re-computations do occur, but the only RDD's that are recovered are the
>>> ones
>>> from the data checkpoint. This is what we've seen. Is not enough by
>>> itself
>>> to ensure recovery of computed data and this partial recovery leads to
>>> inconsistency in some cases.
>>> Roger - I share the same question with you - I'm just not sure if the
>>> replicated data really gets persisted on every batch. The execution
>>> lineage
>>> is checkpointed, but if we have big chunks of data being consumed to
>>> Receiver node on let's say a second bases then having it persisted to
>>> HDFS
>>> every second could be a big challenge for keeping JVM performance - maybe
>>> that could be reason why it's not really implemented...assuming it isn't.
>>> Dibyendu had a great effort with the offset controlling code but the
>>> general
>>> state consistent recovery feels to me like another big issue to address.
>>> I plan on having a dive into the Streaming code and try to at least
>>> contribute with some ideas. Some more insight from anyone on the dev team
>>> will be very appreciated.
>>> tnks,
>>> Rod
>>> --
>>> View this message in context:
>>> Sent from the Apache Spark User List mailing list archive at
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail:
>>> For additional commands, e-mail:

View raw message