spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Smith <>
Subject Re: Low Level Kafka Consumer for Spark
Date Sun, 31 Aug 2014 04:25:07 GMT
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.

> For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?

On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover <>

> I have this same question.  Isn't there somewhere that the Kafka range
> metadata can be saved?  From my naive perspective, it seems like it should
> be very similar to HDFS lineage.  The original HDFS blocks are kept
> somewhere (in the driver?) so that if an RDD partition is lost, it can be
> recomputed.  In this case, all we need is the Kafka topic, partition, and
> offset range.
> Can someone enlighten us on why two copies of the RDD are needed (or some
> other mechanism like a WAL) for fault tolerance when using Kafka but not
> when reading from say HDFS?

> On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges <>
> wrote:
>> 'this 2-node replication is mainly for failover in case the receiver
>> dies while data is in flight.  there's still chance for data loss as
>> there's no write ahead log on the hot path, but this is being addressed.'
>> Can you comment a little on how this will be addressed, will there be a
>> durable WAL?  Is there a JIRA for tracking this effort?
>> I am curious without WAL if you can avoid this data loss with explicit
>> management of Kafka offsets e.g. don't commit offset unless data is
>> replicated to multiple nodes or maybe not until processed.  The incoming
>> data will always be durably stored to disk in Kafka so can be replayed in
>> failure scenarios to avoid data loss if the offsets are managed properly.
>> On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly <> wrote:
>>> @bharat-
>>> overall, i've noticed a lot of confusion about how Spark Streaming
>>> scales - as well as how it handles failover and checkpointing, but we can
>>> discuss that separately.
>>> there's actually 2 dimensions to scaling here:  receiving and processing.
>>> *Receiving*
>>> receiving can be scaled out by submitting new DStreams/Receivers to the
>>> cluster as i've done in the Kinesis example.  in fact, i purposely chose to
>>> submit multiple receivers in my Kinesis example because i feel it should be
>>> the norm and not the exception - particularly for partitioned and
>>> checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
>>> only way to scale.
>>> a side note here is that each receiver running in the cluster will
>>> immediately replicates to 1 other node for fault-tolerance of that specific
>>> receiver.  this is where the confusion lies.  this 2-node replication is
>>> mainly for failover in case the receiver dies while data is in flight.
>>>  there's still chance for data loss as there's no write ahead log on the
>>> hot path, but this is being addressed.
>>> this in mentioned in the docs here:
>>> *Processing*
>>> once data is received, tasks are scheduled across the Spark cluster just
>>> like any other non-streaming task where you can specify the number of
>>> partitions for reduces, etc.  this is the part of scaling that is sometimes
>>> overlooked - probably because it "works just like regular Spark", but it is
>>> worth highlighting.
>>> Here's a blurb in the docs:
>>> the other thing that's confusing with Spark Streaming is that in Scala,
>>> you need to explicitly
>>> import
>>> org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
>>> in order to pick up the implicits that allow DStream.reduceByKey and
>>> such (versus DStream.transform(rddBatch => rddBatch.reduceByKey())
>>> in other words, DStreams appear to be relatively featureless until you
>>> discover this implicit.  otherwise, you need to operate on the underlying
>>> RDD's explicitly which is not ideal.
>>> the Kinesis example referenced earlier in the thread uses the DStream
>>> implicits.
>>> side note to all of this - i've recently convinced my publisher for my
>>> upcoming book, Spark In Action, to let me jump ahead and write the Spark
>>> Streaming chapter ahead of other more well-understood libraries.  early
>>> release is in a month or so.  sign up  @ if
>>> you wanna get notified.
>>> shameless plug that i wouldn't otherwise do, but i really think it will
>>> help clear a lot of confusion in this area as i hear these questions asked
>>> a lot in my talks and such.  and i think a clear, crisp story on scaling
>>> and fault-tolerance will help Spark Streaming's adoption.
>>> hope that helps!
>>> -chris
>>> On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya <
>>>> wrote:
>>>> I agree. This issue should be fixed in Spark rather rely on replay of
>>>> Kafka messages.
>>>> Dib
>>>> On Aug 28, 2014 6:45 AM, "RodrigoB" <> wrote:
>>>>> Dibyendu,
>>>>> Tnks for getting back.
>>>>> I believe you are absolutely right. We were under the assumption that
>>>>> the
>>>>> raw data was being computed again and that's not happening after
>>>>> further
>>>>> tests. This applies to Kafka as well.
>>>>> The issue is of major priority fortunately.
>>>>> Regarding your suggestion, I would maybe prefer to have the problem
>>>>> resolved
>>>>> within Spark's internals since once the data is replicated we should
>>>>> be able
>>>>> to access it once more and not having to pool it back again from Kafka
>>>>> or
>>>>> any other stream that is being affected by this issue. If for example
>>>>> there
>>>>> is a big amount of batches to be recomputed I would rather have them
>>>>> done
>>>>> distributed than overloading the batch interval with huge amount of
>>>>> Kafka
>>>>> messages.
>>>>> I do not have yet enough know how on where is the issue and about the
>>>>> internal Spark code so I can't really how much difficult will be the
>>>>> implementation.
>>>>> tnks,
>>>>> Rod
>>>>> --
>>>>> View this message in context:
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail:
>>>>> For additional commands, e-mail:

View raw message