spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Which committers care about Kafka?
Date Fri, 19 Dec 2014 21:48:29 GMT
The problems you guys are discussing come from trying to store state in
spark, so don't do that.  Spark isn't a distributed database.

Just map kafka partitions directly to rdds, llet user code specify the
range of offsets explicitly, and let them be in charge of committing
offsets.

Using the simple consumer isn't that bad, I'm already using this in
production with the code I linked to, and tresata apparently has been as
well.  Again, for everyone saying this is impossible, have you read either
of those implementations and looked at the approach?



On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <Sean.McNamara@webtrends.com>
wrote:

> Please feel free to correct me if I’m wrong, but I think the exactly once
> spark streaming semantics can easily be solved using updateStateByKey. Make
> the key going into updateStateByKey be a hash of the event, or pluck off
> some uuid from the message.  The updateFunc would only emit the message if
> the key did not exist, and the user has complete control over the window of
> time / state lifecycle for detecting duplicates.  It also makes it really
> easy to detect and take action (alert?) when you DO see a duplicate, or
> make memory tradeoffs within an error bound using a sketch algorithm.  The
> kafka simple consumer is insanely complex, if possible I think it would be
> better (and vastly more flexible) to get reliability using the primitives
> that spark so elegantly provides.
>
> Cheers,
>
> Sean
>
>
> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
> hshreedharan@cloudera.com> wrote:
> >
> > Hi Dibyendu,
> >
> > Thanks for the details on the implementation. But I still do not believe
> > that it is no duplicates - what they achieve is that the same batch is
> > processed exactly the same way every time (but see it may be processed
> more
> > than once) - so it depends on the operation being idempotent. I believe
> > Trident uses ZK to keep track of the transactions - a batch can be
> > processed multiple times in failure scenarios (for example, the
> transaction
> > is processed but before ZK is updated the machine fails, causing a "new"
> > node to process it again).
> >
> > I don't think it is impossible to do this in Spark Streaming as well and
> > I'd be really interested in working on it at some point in the near
> future.
> >
> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
> > dibyendu.bhattachary@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
> >> Trident has done the exact-once guarantee by processing the tuple in a
> >> batch  and assigning same transaction-id for a given batch . The replay
> for
> >> a given batch with a transaction-id will have exact same set of tuples
> and
> >> replay of batches happen in exact same order before the failure.
> >>
> >> Having this paradigm, if downstream system process data for a given
> batch
> >> for having a given transaction-id , and if during failure if same batch
> is
> >> again emitted , you can check if same transaction-id is already
> processed
> >> or not and hence can guarantee exact once semantics.
> >>
> >> And this can only be achieved in Spark if we use Low Level Kafka
> consumer
> >> API to process the offsets. This low level Kafka Consumer (
> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
> >> related logic has been taken from Storm-Kafka spout and which manages
> all
> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
> managements.
> >>
> >> Presently this Consumer maintains that during Receiver failure, it will
> >> re-emit the exact same Block with same set of messages . Every message
> have
> >> the details of its partition, offset and topic related details which can
> >> tackle the SPARK-3146.
> >>
> >> As this Low Level consumer has complete control over the Kafka Offsets ,
> >> we can implement Trident like feature on top of it like having
> implement a
> >> transaction-id for a given block , and re-emit the same block with same
> set
> >> of message during Driver failure.
> >>
> >> Regards,
> >> Dibyendu
> >>
> >>
> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <saisai.shao@intel.com>
> >> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I agree with Hari that Strong exact-once semantics is very hard to
> >>> guarantee, especially in the failure situation. From my understanding
> even
> >>> current implementation of ReliableKafkaReceiver cannot fully guarantee
> the
> >>> exact once semantics once failed, first is the ordering of data
> replaying
> >>> from last checkpoint, this is hard to guarantee when multiple
> partitions
> >>> are injected in; second is the design complexity of achieving this,
> you can
> >>> refer to the Kafka Spout in Trident, we have to dig into the very
> details
> >>> of Kafka metadata management system to achieve this, not to say
> rebalance
> >>> and fault-tolerance.
> >>>
> >>> Thanks
> >>> Jerry
> >>>
> >>> -----Original Message-----
> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
> >>> Sent: Friday, December 19, 2014 5:57 AM
> >>> To: Cody Koeninger
> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
> >>> Subject: Re: Which committers care about Kafka?
> >>>
> >>> But idempotency is not that easy t achieve sometimes. A strong only
> once
> >>> semantic through a proper API would  be superuseful; but I'm not
> implying
> >>> this is easy to achieve.
> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <cody@koeninger.org> wrote:
> >>>
> >>>> If the downstream store for the output data is idempotent or
> >>>> transactional, and that downstream store also is the system of record
> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
> >>>> offsets with / after the data is stored.  On any failure, restart from
> >>> the last committed offsets.
> >>>>
> >>>> Yes, this approach is biased towards the etl-like use cases rather
> >>>> than near-realtime-analytics use cases.
> >>>>
> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
> >>>> hshreedharan@cloudera.com
> >>>>> wrote:
> >>>>>
> >>>>> I get what you are saying. But getting exactly once right is an
> >>>>> extremely hard problem - especially in presence of failure. The
> >>>>> issue is failures
> >>>> can
> >>>>> happen in a bunch of places. For example, before the notification
of
> >>>>> downstream store being successful reaches the receiver that updates
> >>>>> the offsets, the node fails. The store was successful, but
> >>>>> duplicates came in either way. This is something worth discussing
by
> >>>>> itself - but without uuids etc this might not really be solved even
> >>> when you think it is.
> >>>>>
> >>>>> Anyway, I will look at the links. Even I am interested in all of
the
> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
> >>>>> delivery,
> >>>> but
> >>>>> I doubt the latter is really possible to guarantee - though I really
> >>>> would
> >>>>> love to have that!
> >>>>>
> >>>>> Thanks,
> >>>>> Hari
> >>>>>
> >>>>>
> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
> >>>>> <cody@koeninger.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the replies.
> >>>>>>
> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
you
> >>>>>> actually want exactly-once semantics, you need control of kafka
> >>>>>> offsets
> >>>> as
> >>>>>> well, including the ability to not use zookeeper as the system
of
> >>>>>> record for offsets.  Kafka already is a reliable system that
has
> >>>>>> strong
> >>>> ordering
> >>>>>> guarantees (within a partition) and does not mandate the use
of
> >>>> zookeeper
> >>>>>> to store offsets.  I think there should be a spark api that
acts as
> >>>>>> a
> >>>> very
> >>>>>> simple intermediary between Kafka and the user's choice of
> >>>>>> downstream
> >>>> store.
> >>>>>>
> >>>>>> Take a look at the links I posted - if there's already been
2
> >>>> independent
> >>>>>> implementations of the idea, chances are it's something people
need.
> >>>>>>
> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
> >>>>>> hshreedharan@cloudera.com> wrote:
> >>>>>>>
> >>>>>>> Hi Cody,
> >>>>>>>
> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
> >>>>>>> something pretty simple and lightweight for that one.
> >>>>>>>
> >>>>>>> For the Kafka DStream skipping the WAL implementation -
this is
> >>>>>>> something I discussed with TD a few weeks ago. Though it
is a good
> >>>> idea to
> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
> >>>> optimization. For
> >>>>>>> that reason, we must be careful in implementation. There
are a
> >>>>>>> couple
> >>>> of
> >>>>>>> issues that we need to ensure works properly - specifically
> >>> ordering.
> >>>> To
> >>>>>>> ensure we pull messages from different topics and partitions
in
> >>>>>>> the
> >>>> same
> >>>>>>> order after failure, we’d still have to persist the metadata
to
> >>>>>>> HDFS
> >>>> (or
> >>>>>>> some other system) - this metadata must contain the order
of
> >>>>>>> messages consumed, so we know how to re-read the messages.
I am
> >>>>>>> planning to
> >>>> explore
> >>>>>>> this once I have some time (probably in Jan). In addition,
we must
> >>>>>>> also ensure bucketing functions work fine as well. I will
file a
> >>>>>>> placeholder jira for this one.
> >>>>>>>
> >>>>>>> I also wrote an API to write data back to Kafka a while
back -
> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping
that this
> >>>>>>> will get pulled in soon, as this is something I know people
want.
> >>>>>>> I am open
> >>>> to
> >>>>>>> feedback on that - anything that I can do to make it better.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Hari
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
> >>>>>>> <pwendell@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Cody,
> >>>>>>>>
> >>>>>>>> Thanks for reaching out with this. The lead on streaming
is TD -
> >>>>>>>> he is traveling this week though so I can respond a
bit. To the
> >>>>>>>> high level point of whether Kafka is important - it
definitely
> >>>>>>>> is. Something like 80% of Spark Streaming deployments
> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support
for
> >>>>>>>> Kafka is something we generally want in Spark and not
a library.
> >>>>>>>> In some cases IIRC there were user libraries that used
unstable
> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
stabilize
> >>>>>>>> them to merge things upstream. Otherwise users wouldn't
be able
> >>>>>>>> to use newer Kakfa versions. This is a high level impression
only
> >>>>>>>> though, I haven't talked to TD about this recently so
it's worth
> >>> revisiting given the developments in Kafka.
> >>>>>>>>
> >>>>>>>> Please do bring things up like this on the dev list
if there are
> >>>>>>>> blockers for your usage - thanks for pinging it.
> >>>>>>>>
> >>>>>>>> - Patrick
> >>>>>>>>
> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
> >>>>>>>> <cody@koeninger.org>
> >>>>>>>> wrote:
> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people
to get
> >>>>>>>>> some long-standing Kafka related issues resolved?
> >>>>>>>>>
> >>>>>>>>> The existing api is not sufficiently safe nor flexible
for our
> >>>>>>>> production
> >>>>>>>>> use. I don't think we're alone in this viewpoint,
because I've
> >>>>>>>>> seen several different patches and libraries to
fix the same
> >>>>>>>>> things we've
> >>>>>>>> been
> >>>>>>>>> running into.
> >>>>>>>>>
> >>>>>>>>> Regarding flexibility
> >>>>>>>>>
> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
> >>>>>>>>>
> >>>>>>>>> has been outstanding since August, and IMHO an equivalent
of
> >>>>>>>>> this is absolutely necessary. We wrote a similar
patch
> >>>>>>>>> ourselves, then found
> >>>>>>>> that
> >>>>>>>>> PR and have been running it in production. We wouldn't
be able
> >>>>>>>>> to
> >>>> get
> >>>>>>>> our
> >>>>>>>>> jobs done without it. It also allows users to solve
a whole
> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388,
arbitrary
> >>>>>>>>> delay of
> >>>>>>>> messages, etc).
> >>>>>>>>>
> >>>>>>>>> Regarding safety, I understand the motivation behind
> >>>>>>>>> WriteAheadLog
> >>>> as
> >>>>>>>> a
> >>>>>>>>> general solution for streaming unreliable sources,
but Kafka
> >>>>>>>>> already
> >>>>>>>> is a
> >>>>>>>>> reliable source. I think there's a need for an api
that treats
> >>>>>>>>> it as such. Even aside from the performance issues
of
> >>>>>>>>> duplicating the write-ahead log in kafka into another
> >>>>>>>>> write-ahead log in hdfs, I
> >>>> need
> >>>>>>>>> exactly-once semantics in the face of failure (I've
had
> >>>>>>>>> failures
> >>>> that
> >>>>>>>>> prevented reloading a spark streaming checkpoint,
for instance).
> >>>>>>>>>
> >>>>>>>>> I've got an implementation i've been using
> >>>>>>>>>
> >>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
> >>>>>>>>>
> >>>>>>>>> Tresata has something similar at
> >>>>>>>> https://github.com/tresata/spark-kafka,
> >>>>>>>>> and I know there were earlier attempts based on
Storm code.
> >>>>>>>>>
> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
rather
> >>>>>>>>> than
> >>>>>>>> patches
> >>>>>>>>> to Spark is problematic, because large portions
of the
> >>>> implementation
> >>>>>>>> are
> >>>>>>>>> private[spark].
> >>>>>>>>>
> >>>>>>>>> I'd like to help, but i need to know whose attention
to get.
> >>>>>>>>
> >>>>>>>> -----------------------------------------------------------------
> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For
> >>>>>>>> additional commands, e-mail: dev-help@spark.apache.org
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message