spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan" <hshreedha...@cloudera.com>
Subject Re: Which committers care about Kafka?
Date Thu, 25 Dec 2014 07:54:37 GMT
In general such discussions happen or is posted on the dev lists. Could you please post a summary?
Thanks.



Thanks, Hari

On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <cody@koeninger.org>
wrote:

> After a long talk with Patrick and TD (thanks guys), I opened the following
> jira
> https://issues.apache.org/jira/browse/SPARK-4964
> Sample PR has an impementation for the batch and the dstream case, and a
> link to a project with example usage.
> On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <koert@tresata.com> wrote:
>> yup, we at tresata do the idempotent store the same way. very simple
>> approach.
>>
>> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>>
>>> That KafkaRDD code is dead simple.
>>>
>>> Given a user specified map
>>>
>>> (topic1, partition0) -> (startingOffset, endingOffset)
>>> (topic1, partition1) -> (startingOffset, endingOffset)
>>> ...
>>> turn each one of those entries into a partition of an rdd, using the
>>> simple
>>> consumer.
>>> That's it.  No recovery logic, no state, nothing - for any failures, bail
>>> on the rdd and let it retry.
>>> Spark stays out of the business of being a distributed database.
>>>
>>> The client code does any transformation it wants, then stores the data and
>>> offsets.  There are two ways of doing this, either based on idempotence or
>>> a transactional data store.
>>>
>>> For idempotent stores:
>>>
>>> 1.manipulate data
>>> 2.save data to store
>>> 3.save ending offsets to the same store
>>>
>>> If you fail between 2 and 3, the offsets haven't been stored, you start
>>> again at the same beginning offsets, do the same calculations in the same
>>> order, overwrite the same data, all is good.
>>>
>>>
>>> For transactional stores:
>>>
>>> 1. manipulate data
>>> 2. begin transaction
>>> 3. save data to the store
>>> 4. save offsets
>>> 5. commit transaction
>>>
>>> If you fail before 5, the transaction rolls back.  To make this less
>>> heavyweight, you can write the data outside the transaction and then
>>> update
>>> a pointer to the current data inside the transaction.
>>>
>>>
>>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>>> fact, the current streaming api actively impedes my ability to do the
>>> above.  I'm just suggesting providing an api that doesn't get in the way
>>> of
>>> exactly-once.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>>> hshreedharan@cloudera.com
>>> > wrote:
>>>
>>> > Can you explain your basic algorithm for the once-only-delivery? It is
>>> > quite a bit of very Kafka-specific code, that would take more time to
>>> read
>>> > than I can currently afford? If you can explain your algorithm a bit, it
>>> > might help.
>>> >
>>> > Thanks,
>>> > Hari
>>> >
>>> >
>>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <cody@koeninger.org>
>>> > wrote:
>>> >
>>> >>
>>> >> The problems you guys are discussing come from trying to store state
in
>>> >> spark, so don't do that.  Spark isn't a distributed database.
>>> >>
>>> >> Just map kafka partitions directly to rdds, llet user code specify the
>>> >> range of offsets explicitly, and let them be in charge of committing
>>> >> offsets.
>>> >>
>>> >> Using the simple consumer isn't that bad, I'm already using this in
>>> >> production with the code I linked to, and tresata apparently has been
>>> as
>>> >> well.  Again, for everyone saying this is impossible, have you read
>>> either
>>> >> of those implementations and looked at the approach?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>>> >> Sean.McNamara@webtrends.com> wrote:
>>> >>
>>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>>> >>> once spark streaming semantics can easily be solved using
>>> updateStateByKey.
>>> >>> Make the key going into updateStateByKey be a hash of the event,
or
>>> pluck
>>> >>> off some uuid from the message.  The updateFunc would only emit
the
>>> message
>>> >>> if the key did not exist, and the user has complete control over
the
>>> window
>>> >>> of time / state lifecycle for detecting duplicates.  It also makes
it
>>> >>> really easy to detect and take action (alert?) when you DO see a
>>> duplicate,
>>> >>> or make memory tradeoffs within an error bound using a sketch
>>> algorithm.
>>> >>> The kafka simple consumer is insanely complex, if possible I think
it
>>> would
>>> >>> be better (and vastly more flexible) to get reliability using the
>>> >>> primitives that spark so elegantly provides.
>>> >>>
>>> >>> Cheers,
>>> >>>
>>> >>> Sean
>>> >>>
>>> >>>
>>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>>> >>> hshreedharan@cloudera.com> wrote:
>>> >>> >
>>> >>> > Hi Dibyendu,
>>> >>> >
>>> >>> > Thanks for the details on the implementation. But I still do
not
>>> >>> believe
>>> >>> > that it is no duplicates - what they achieve is that the same
batch
>>> is
>>> >>> > processed exactly the same way every time (but see it may be
>>> processed
>>> >>> more
>>> >>> > than once) - so it depends on the operation being idempotent.
I
>>> believe
>>> >>> > Trident uses ZK to keep track of the transactions - a batch
can be
>>> >>> > processed multiple times in failure scenarios (for example,
the
>>> >>> transaction
>>> >>> > is processed but before ZK is updated the machine fails, causing
a
>>> >>> "new"
>>> >>> > node to process it again).
>>> >>> >
>>> >>> > I don't think it is impossible to do this in Spark Streaming
as well
>>> >>> and
>>> >>> > I'd be really interested in working on it at some point in
the near
>>> >>> future.
>>> >>> >
>>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>>> >>> > dibyendu.bhattachary@gmail.com> wrote:
>>> >>> >
>>> >>> >> Hi,
>>> >>> >>
>>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident.
The
>>> Storm
>>> >>> >> Trident has done the exact-once guarantee by processing
the tuple
>>> in a
>>> >>> >> batch  and assigning same transaction-id for a given batch
. The
>>> >>> replay for
>>> >>> >> a given batch with a transaction-id will have exact same
set of
>>> >>> tuples and
>>> >>> >> replay of batches happen in exact same order before the
failure.
>>> >>> >>
>>> >>> >> Having this paradigm, if downstream system process data
for a given
>>> >>> batch
>>> >>> >> for having a given transaction-id , and if during failure
if same
>>> >>> batch is
>>> >>> >> again emitted , you can check if same transaction-id is
already
>>> >>> processed
>>> >>> >> or not and hence can guarantee exact once semantics.
>>> >>> >>
>>> >>> >> And this can only be achieved in Spark if we use Low Level
Kafka
>>> >>> consumer
>>> >>> >> API to process the offsets. This low level Kafka Consumer
(
>>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
>>> the
>>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs .
All of the
>>> >>> Kafka
>>> >>> >> related logic has been taken from Storm-Kafka spout and
which
>>> manages
>>> >>> all
>>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>>> >>> managements.
>>> >>> >>
>>> >>> >> Presently this Consumer maintains that during Receiver
failure, it
>>> >>> will
>>> >>> >> re-emit the exact same Block with same set of messages
. Every
>>> >>> message have
>>> >>> >> the details of its partition, offset and topic related
details
>>> which
>>> >>> can
>>> >>> >> tackle the SPARK-3146.
>>> >>> >>
>>> >>> >> As this Low Level consumer has complete control over the
Kafka
>>> >>> Offsets ,
>>> >>> >> we can implement Trident like feature on top of it like
having
>>> >>> implement a
>>> >>> >> transaction-id for a given block , and re-emit the same
block with
>>> >>> same set
>>> >>> >> of message during Driver failure.
>>> >>> >>
>>> >>> >> Regards,
>>> >>> >> Dibyendu
>>> >>> >>
>>> >>> >>
>>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>>> saisai.shao@intel.com>
>>> >>> >> wrote:
>>> >>> >>>
>>> >>> >>> Hi all,
>>> >>> >>>
>>> >>> >>> I agree with Hari that Strong exact-once semantics
is very hard to
>>> >>> >>> guarantee, especially in the failure situation. From
my
>>> >>> understanding even
>>> >>> >>> current implementation of ReliableKafkaReceiver cannot
fully
>>> >>> guarantee the
>>> >>> >>> exact once semantics once failed, first is the ordering
of data
>>> >>> replaying
>>> >>> >>> from last checkpoint, this is hard to guarantee when
multiple
>>> >>> partitions
>>> >>> >>> are injected in; second is the design complexity of
achieving
>>> this,
>>> >>> you can
>>> >>> >>> refer to the Kafka Spout in Trident, we have to dig
into the very
>>> >>> details
>>> >>> >>> of Kafka metadata management system to achieve this,
not to say
>>> >>> rebalance
>>> >>> >>> and fault-tolerance.
>>> >>> >>>
>>> >>> >>> Thanks
>>> >>> >>> Jerry
>>> >>> >>>
>>> >>> >>> -----Original Message-----
>>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>>> >>> >>> To: Cody Koeninger
>>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>>> >>> >>> Subject: Re: Which committers care about Kafka?
>>> >>> >>>
>>> >>> >>> But idempotency is not that easy t achieve sometimes.
A strong
>>> only
>>> >>> once
>>> >>> >>> semantic through a proper API would  be superuseful;
but I'm not
>>> >>> implying
>>> >>> >>> this is easy to achieve.
>>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <cody@koeninger.org>
>>> wrote:
>>> >>> >>>
>>> >>> >>>> If the downstream store for the output data is
idempotent or
>>> >>> >>>> transactional, and that downstream store also is
the system of
>>> >>> record
>>> >>> >>>> for kafka offsets, then you have exactly-once semantics.
 Commit
>>> >>> >>>> offsets with / after the data is stored.  On any
failure, restart
>>> >>> from
>>> >>> >>> the last committed offsets.
>>> >>> >>>>
>>> >>> >>>> Yes, this approach is biased towards the etl-like
use cases
>>> rather
>>> >>> >>>> than near-realtime-analytics use cases.
>>> >>> >>>>
>>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan
<
>>> >>> >>>> hshreedharan@cloudera.com
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>> I get what you are saying. But getting exactly
once right is an
>>> >>> >>>>> extremely hard problem - especially in presence
of failure. The
>>> >>> >>>>> issue is failures
>>> >>> >>>> can
>>> >>> >>>>> happen in a bunch of places. For example, before
the
>>> notification
>>> >>> of
>>> >>> >>>>> downstream store being successful reaches the
receiver that
>>> updates
>>> >>> >>>>> the offsets, the node fails. The store was
successful, but
>>> >>> >>>>> duplicates came in either way. This is something
worth
>>> discussing
>>> >>> by
>>> >>> >>>>> itself - but without uuids etc this might not
really be solved
>>> even
>>> >>> >>> when you think it is.
>>> >>> >>>>>
>>> >>> >>>>> Anyway, I will look at the links. Even I am
interested in all of
>>> >>> the
>>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka
and once-only
>>> >>> >>>>> delivery,
>>> >>> >>>> but
>>> >>> >>>>> I doubt the latter is really possible to guarantee
- though I
>>> >>> really
>>> >>> >>>> would
>>> >>> >>>>> love to have that!
>>> >>> >>>>>
>>> >>> >>>>> Thanks,
>>> >>> >>>>> Hari
>>> >>> >>>>>
>>> >>> >>>>>
>>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>>> >>> >>>>> <cody@koeninger.org>
>>> >>> >>>>> wrote:
>>> >>> >>>>>
>>> >>> >>>>>> Thanks for the replies.
>>> >>> >>>>>>
>>> >>> >>>>>> Regarding skipping WAL, it's not just about
optimization.  If
>>> you
>>> >>> >>>>>> actually want exactly-once semantics, you
need control of kafka
>>> >>> >>>>>> offsets
>>> >>> >>>> as
>>> >>> >>>>>> well, including the ability to not use
zookeeper as the system
>>> of
>>> >>> >>>>>> record for offsets.  Kafka already is a
reliable system that
>>> has
>>> >>> >>>>>> strong
>>> >>> >>>> ordering
>>> >>> >>>>>> guarantees (within a partition) and does
not mandate the use of
>>> >>> >>>> zookeeper
>>> >>> >>>>>> to store offsets.  I think there should
be a spark api that
>>> acts
>>> >>> as
>>> >>> >>>>>> a
>>> >>> >>>> very
>>> >>> >>>>>> simple intermediary between Kafka and the
user's choice of
>>> >>> >>>>>> downstream
>>> >>> >>>> store.
>>> >>> >>>>>>
>>> >>> >>>>>> Take a look at the links I posted - if
there's already been 2
>>> >>> >>>> independent
>>> >>> >>>>>> implementations of the idea, chances are
it's something people
>>> >>> need.
>>> >>> >>>>>>
>>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan
<
>>> >>> >>>>>> hshreedharan@cloudera.com> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>> Hi Cody,
>>> >>> >>>>>>>
>>> >>> >>>>>>> I am an absolute +1 on SPARK-3146.
I think we can implement
>>> >>> >>>>>>> something pretty simple and lightweight
for that one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> For the Kafka DStream skipping the
WAL implementation - this
>>> is
>>> >>> >>>>>>> something I discussed with TD a few
weeks ago. Though it is a
>>> >>> good
>>> >>> >>>> idea to
>>> >>> >>>>>>> implement this to avoid unnecessary
HDFS writes, it is an
>>> >>> >>>> optimization. For
>>> >>> >>>>>>> that reason, we must be careful in
implementation. There are a
>>> >>> >>>>>>> couple
>>> >>> >>>> of
>>> >>> >>>>>>> issues that we need to ensure works
properly - specifically
>>> >>> >>> ordering.
>>> >>> >>>> To
>>> >>> >>>>>>> ensure we pull messages from different
topics and partitions
>>> in
>>> >>> >>>>>>> the
>>> >>> >>>> same
>>> >>> >>>>>>> order after failure, we’d still have
to persist the metadata
>>> to
>>> >>> >>>>>>> HDFS
>>> >>> >>>> (or
>>> >>> >>>>>>> some other system) - this metadata
must contain the order of
>>> >>> >>>>>>> messages consumed, so we know how to
re-read the messages. I
>>> am
>>> >>> >>>>>>> planning to
>>> >>> >>>> explore
>>> >>> >>>>>>> this once I have some time (probably
in Jan). In addition, we
>>> >>> must
>>> >>> >>>>>>> also ensure bucketing functions work
fine as well. I will
>>> file a
>>> >>> >>>>>>> placeholder jira for this one.
>>> >>> >>>>>>>
>>> >>> >>>>>>> I also wrote an API to write data back
to Kafka a while back -
>>> >>> >>>>>>> https://github.com/apache/spark/pull/2994
. I am hoping that
>>> >>> this
>>> >>> >>>>>>> will get pulled in soon, as this is
something I know people
>>> want.
>>> >>> >>>>>>> I am open
>>> >>> >>>> to
>>> >>> >>>>>>> feedback on that - anything that I
can do to make it better.
>>> >>> >>>>>>>
>>> >>> >>>>>>> Thanks,
>>> >>> >>>>>>> Hari
>>> >>> >>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick
Wendell
>>> >>> >>>>>>> <pwendell@gmail.com>
>>> >>> >>>>>>> wrote:
>>> >>> >>>>>>>
>>> >>> >>>>>>>> Hey Cody,
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Thanks for reaching out with this.
The lead on streaming is
>>> TD -
>>> >>> >>>>>>>> he is traveling this week though
so I can respond a bit. To
>>> the
>>> >>> >>>>>>>> high level point of whether Kafka
is important - it
>>> definitely
>>> >>> >>>>>>>> is. Something like 80% of Spark
Streaming deployments
>>> >>> >>>>>>>> (anecdotally) ingest data from
Kafka. Also, good support for
>>> >>> >>>>>>>> Kafka is something we generally
want in Spark and not a
>>> library.
>>> >>> >>>>>>>> In some cases IIRC there were user
libraries that used
>>> unstable
>>> >>> >>>>>>>> Kafka API's and we were somewhat
waiting on Kafka to
>>> stabilize
>>> >>> >>>>>>>> them to merge things upstream.
Otherwise users wouldn't be
>>> able
>>> >>> >>>>>>>> to use newer Kakfa versions. This
is a high level impression
>>> >>> only
>>> >>> >>>>>>>> though, I haven't talked to TD
about this recently so it's
>>> worth
>>> >>> >>> revisiting given the developments in Kafka.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> Please do bring things up like
this on the dev list if there
>>> are
>>> >>> >>>>>>>> blockers for your usage - thanks
for pinging it.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> - Patrick
>>> >>> >>>>>>>>
>>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM,
Cody Koeninger
>>> >>> >>>>>>>> <cody@koeninger.org>
>>> >>> >>>>>>>> wrote:
>>> >>> >>>>>>>>> Now that 1.2 is finalized...
who are the go-to people to get
>>> >>> >>>>>>>>> some long-standing Kafka related
issues resolved?
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> The existing api is not sufficiently
safe nor flexible for
>>> our
>>> >>> >>>>>>>> production
>>> >>> >>>>>>>>> use. I don't think we're alone
in this viewpoint, because
>>> I've
>>> >>> >>>>>>>>> seen several different patches
and libraries to fix the same
>>> >>> >>>>>>>>> things we've
>>> >>> >>>>>>>> been
>>> >>> >>>>>>>>> running into.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding flexibility
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> has been outstanding since
August, and IMHO an equivalent of
>>> >>> >>>>>>>>> this is absolutely necessary.
We wrote a similar patch
>>> >>> >>>>>>>>> ourselves, then found
>>> >>> >>>>>>>> that
>>> >>> >>>>>>>>> PR and have been running it
in production. We wouldn't be
>>> able
>>> >>> >>>>>>>>> to
>>> >>> >>>> get
>>> >>> >>>>>>>> our
>>> >>> >>>>>>>>> jobs done without it. It also
allows users to solve a whole
>>> >>> >>>>>>>>> class of problems for themselves
(e.g. SPARK-2388, arbitrary
>>> >>> >>>>>>>>> delay of
>>> >>> >>>>>>>> messages, etc).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Regarding safety, I understand
the motivation behind
>>> >>> >>>>>>>>> WriteAheadLog
>>> >>> >>>> as
>>> >>> >>>>>>>> a
>>> >>> >>>>>>>>> general solution for streaming
unreliable sources, but Kafka
>>> >>> >>>>>>>>> already
>>> >>> >>>>>>>> is a
>>> >>> >>>>>>>>> reliable source. I think there's
a need for an api that
>>> treats
>>> >>> >>>>>>>>> it as such. Even aside from
the performance issues of
>>> >>> >>>>>>>>> duplicating the write-ahead
log in kafka into another
>>> >>> >>>>>>>>> write-ahead log in hdfs, I
>>> >>> >>>> need
>>> >>> >>>>>>>>> exactly-once semantics in the
face of failure (I've had
>>> >>> >>>>>>>>> failures
>>> >>> >>>> that
>>> >>> >>>>>>>>> prevented reloading a spark
streaming checkpoint, for
>>> >>> instance).
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I've got an implementation
i've been using
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>>
>>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Tresata has something similar
at
>>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>>> >>> >>>>>>>>> and I know there were earlier
attempts based on Storm code.
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> Trying to distribute these
kinds of fixes as libraries
>>> rather
>>> >>> >>>>>>>>> than
>>> >>> >>>>>>>> patches
>>> >>> >>>>>>>>> to Spark is problematic, because
large portions of the
>>> >>> >>>> implementation
>>> >>> >>>>>>>> are
>>> >>> >>>>>>>>> private[spark].
>>> >>> >>>>>>>>>
>>> >>> >>>>>>>>> I'd like to help, but i need
to know whose attention to get.
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> -----------------------------------------------------------------
>>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>>> dev-unsubscribe@spark.apache.org
>>> >>> For
>>> >>> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
>>> >>> >>>>>>>>
>>> >>> >>>>>>>>
>>> >>> >>>>>>>
>>> >>> >>>>>
>>> >>> >>>>
>>> >>> >>>
>>> >>> >>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message