spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: RE: Spark or Storm
Date Fri, 19 Jun 2015 14:37:02 GMT
auto.offset.reset only applies when there are no starting offsets (either
from a checkpoint, or from you providing them explicitly)

On Fri, Jun 19, 2015 at 6:10 AM, bit1129@163.com <bit1129@163.com> wrote:

>
> I think your observation is correct, you have to take care of these
> replayed data at your end,eg,each message has a unique id or something else.
>
> I am using "I think" in the above sentense, because I am not sure and I
> also have a related question:
> I am wonderring how direct stream + kakfa is implemented when the Driver
> is down and restarted, will it always first replay the checkpointed failed
> batch or will it honor Kafka's offset reset policy(auto.offset.reset). If
> it honors the reset policy and it is set as "smallest", then it is the at
> least once semantics;  if it set "largest", then it will be at most once
> semantics?
>
>
> ------------------------------
> bit1129@163.com
>
>
> *From:* Haopu Wang <HWang@qilinsoft.com>
> *Date:* 2015-06-19 18:47
> *To:* Enno Shioji <eshioji@gmail.com>; Tathagata Das <tdas@databricks.com>
> *CC:* prajod.vettiyattil@wipro.com; Cody Koeninger <cody@koeninger.org>;
> bit1129@163.com; Jordan Pilat <jrpilat@gmail.com>; Will Briggs
> <wrbriggs@gmail.com>; Ashish Soni <asoni.learn@gmail.com>; ayan guha
> <guha.ayan@gmail.com>; user@spark.apache.org; Sateesh Kavuri
> <sateesh.kavuri@gmail.com>; Spark Enthusiast <sparkenthusiast@yahoo.in>;
Sabarish
> Sasidharan <sabarish.sasidharan@manthan.com>
> *Subject:* RE: RE: Spark or Storm
>
> My question is not directly related: about the "exactly-once semantic",
> the document (copied below) said spark streaming gives exactly-once
> semantic, but actually from my test result, with check-point enabled, the
> application always re-process the files in last batch after gracefully
> restart.
>
>
>
> ======
> *Semantics of Received Data*
>
> Different input sources provide different guarantees, ranging from *at-least
> once* to *exactly once*. Read for more details.
> *With Files*
>
> If all of the input data is already present in a fault-tolerant files
> system like HDFS, Spark Streaming can always recover from any failure and
> process all the data. This gives *exactly-once* semantics, that all the
> data will be processed exactly once no matter what fails.
>
>
>
>
>  ------------------------------
>
> *From:* Enno Shioji [mailto:eshioji@gmail.com]
> *Sent:* Friday, June 19, 2015 5:29 PM
> *To:* Tathagata Das
> *Cc:* prajod.vettiyattil@wipro.com; Cody Koeninger; bit1129@163.com;
> Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org;
> Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
> *Subject:* Re: RE: Spark or Storm
>
>
>
> Fair enough, on second thought, just saying that it should be idempotent
> is indeed more confusing.
>
>
>
> I guess the crux of the confusion comes from the fact that people tend to
> assume the work you described (store batch id and skip etc.) is handled by
> the framework, perhaps partly because Storm Trident does handle it (you
> just need to let Storm know if the output operation has succeeded or not,
> and it handles the batch id storing & skipping business). Whenever I
> explain people that one needs to do this additional work you described to
> get end-to-end exactly-once semantics, it usually takes a while to convince
> them. In my limited experience, they tend to interpret "transactional" in
> that sentence to mean that you just have to write to a transactional
> storage like ACID RDB. Pointing them to "Semantics of output operations" is
> usually sufficient though.
>
>
>
> Maybe others like @Ashish can weigh on this; did you interpret it in this
> way?
>
>
>
> What if we change the statement into:
>
> "end-to-end exactly-once semantics (if your updates to downstream systems
> are idempotent or transactional). To learn how to make your updates
> idempotent or transactional, see the "Semantics of output operations"
> section in this chapter
> <https://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics>
> "
>
>
>
> That way, it's clear that it's not sufficient to merely write to a
> "transactional storage" like ACID store.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das <tdas@databricks.com>
> wrote:
>
> If the current documentation is confusing, we can definitely improve the
> documentation. However, I dont not understand why is the term
> "transactional" confusing. If your output operation has to add 5, then the
> user has to implement the following mechanism
>
>
>
> 1. If the unique id of the batch of data is already present in the store,
> then skip the update
>
> 2. Otherwise atomically do both, the update operation as well as store the
> unique id of the batch. This is pretty much the definition of a
> transaction. The user has to be aware of the transactional semantics of the
> data store while implementing this functionality.
>
>
>
> You CAN argue that this effective makes the whole updating sort-a
> idempotent, as even if you try doing it multiple times, it will update only
> once. But that is not what is generally considered as idempotent. Writing a
> fixed count, not an increment, is usually what is called idempotent. And so
> just mentioning that the output operation must be idempotent is, in my
> opinion, more confusing.
>
>
>
> To take a page out of the Storm / Trident guide, even they call this exact
> conditional updating of Trident State as "transactional" operation. See
> "transactional spout" in the Trident State guide -
> https://storm.apache.org/documentation/Trident-state
>
>
>
> In the end, I am totally open the suggestions and PRs on how to make the
> programming guide easier to understand. :)
>
>
>
> TD
>
>
>
> On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji <eshioji@gmail.com> wrote:
>
> Tbh I find the doc around this a bit confusing. If it says "end-to-end
> exactly-once semantics (if your updates to downstream systems are
> idempotent or transactional)", I think most people will interpret it that
> as long as you use a storage which has atomicity (like MySQL/Postgres
> etc.), a successful output operation for a given batch (let's say "+ 5") is
> going to be issued exactly-once against the storage.
>
>
>
> However, as I understand it that's not what this statement means. What it
> is saying is, it will always issue "+5" and never, say "+6", because it
> makes sure a message is processed exactly-once internally. However, it
> *may* issue "+5" more than once for a given batch, and it is up to the
> developer to deal with this by either making the output operation
> idempotent (e.g. "set 5"), or "transactional" (e.g. keep track of batch IDs
> and skip already applied batches etc.).
>
>
>
> I wonder if it makes more sense to drop "or transactional" from the
> statement, because if you think about it, ultimately what you are asked to
> do is to make the writes idempotent even with the "transactional" approach,
> & "transactional" is a bit loaded and would be prone to lead to
> misunderstandings (even though in fairness, if you read the fault tolerance
> chapter it explicitly explains it).
>
>
>
>
>
>
>
> On Fri, Jun 19, 2015 at 2:56 AM, <prajod.vettiyattil@wipro.com> wrote:
>
> More details on the Direct API of Spark 1.3 is at the databricks blog:
> https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html
>
>
>
> Note the use of checkpoints to persist the Kafka offsets in Spark
> Streaming itself, and not in zookeeper.
>
>
>
> Also this statement:”.. This allows one to build a Spark Streaming +
> Kafka pipelines with end-to-end exactly-once semantics (if your updates to
> downstream systems are idempotent or transactional).”
>
>
>
>
>
> *From:* Cody Koeninger [mailto:cody@koeninger.org]
> *Sent:* 18 June 2015 19:38
> *To:* bit1129@163.com
> *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpilat@gmail.com;
> eshioji@gmail.com; wrbriggs@gmail.com; asoni.learn@gmail.com; ayan guha;
> user; sateesh.kavuri@gmail.com; sparkenthusiast@yahoo.in;
> sabarish.sasidharan@manthan.com
> *Subject:* Re: RE: Spark or Storm
>
>
>
> That general description is accurate, but not really a specific issue of
> the direct steam.  It applies to anything consuming from kafka (or, as
> Matei already said, any streaming system really).  You can't have exactly
> once semantics, unless you know something more about how you're storing
> results.
>
>
>
> For "some unique id", topicpartition and offset is usually the obvious
> choice, which is why it's important that the direct stream gives you access
> to the offsets.
>
>
>
> See https://github.com/koeninger/kafka-exactly-once for more info
>
>
>
>
>
>
>
> On Thu, Jun 18, 2015 at 6:47 AM, bit1129@163.com <bit1129@163.com> wrote:
>
>  I am wondering how direct stream api ensures end-to-end exactly once
> semantics
>
>
>
> I think there are two things involved:
>
> 1. From the spark streaming end, the driver will replay the Offset range
> when it's down and restarted,which means that the new tasks will process
> some already processed data.
>
> 2. From the user end, since tasks may process already processed data, user
> end should detect that some data has already been processed,eg,
>
> use some unique ID.
>
>
>
> Not sure if I have understood correctly.
>
>
>
>
>   ------------------------------
>
> bit1129@163.com
>
>
>
> *From:* prajod.vettiyattil@wipro.com
>
> *Date:* 2015-06-18 16:56
>
> *To:* jrpilat@gmail.com; eshioji@gmail.com
>
> *CC:* wrbriggs@gmail.com; asoni.learn@gmail.com; guha.ayan@gmail.com;
> user@spark.apache.org; sateesh.kavuri@gmail.com; sparkenthusiast@yahoo.in;
> sabarish.sasidharan@manthan.com
>
> *Subject:* RE: Spark or Storm
>
> >>not being able to read from Kafka using multiple nodes
>
>
>
> > Kafka is plenty capable of doing this..
>
>
>
> I faced the same issue before Spark 1.3 was released.
>
>
>
> The issue was not with Kafka, but with Spark Streaming’s Kafka connector.
> Before Spark 1.3.0 release one Spark worker would get all the streamed
> messages. We had to re-partition to distribute the processing.
>
>
>
> From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel
> reads from Kafka streamed to Spark workers. See the “Approach 2: Direct
> Approach” in this page:
> http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note
> that is also mentions zero data loss and exactly once semantics for kafka
> integration.
>
>
>
>
>
> Prajod
>
>
>
> *From:* Jordan Pilat [mailto:jrpilat@gmail.com]
> *Sent:* 18 June 2015 03:57
> *To:* Enno Shioji
> *Cc:* Will Briggs; asoni.learn@gmail.com; ayan guha; user; Sateesh
> Kavuri; Spark Enthusiast; Sabarish Sasidharan
> *Subject:* Re: Spark or Storm
>
>
>
> >not being able to read from Kafka using multiple nodes
>
> Kafka is plenty capable of doing this,  by clustering together multiple
> consumer instances into a consumer group.
> If your topic is sufficiently partitioned, the consumer group can consume
> the topic in a parallelized fashion.
> If it isn't, you still have the fault tolerance associated with clustering
> the consumers.
>
> OK
> JRP
>
> On Jun 17, 2015 1:27 AM, "Enno Shioji" <eshioji@gmail.com> wrote:
>
>  We've evaluated Spark Streaming vs. Storm and ended up sticking with
> Storm.
>
>
>
> Some of the important draw backs are:
>
> Spark has no back pressure (receiver rate limit can alleviate this to a
> certain point, but it's far from ideal)
>
> There is also no exactly-once semantics. (updateStateByKey can achieve
> this semantics, but is not practical if you have any significant amount of
> state because it does so by dumping the entire state on every checkpointing)
>
>
>
> There are also some minor drawbacks that I'm sure will be fixed quickly,
> like no task timeout, not being able to read from Kafka using multiple
> nodes, data loss hazard with Kafka.
>
>
>
> It's also not possible to attain very low latency in Spark, if that's what
> you need.
>
>
>
> The pos for Spark is the concise and IMO more intuitive syntax, especially
> if you compare it with Storm's Java API.
>
>
>
> I admit I might be a bit biased towards Storm tho as I'm more familiar
> with it.
>
>
>
> Also, you can do some processing with Kinesis. If all you need to do is
> straight forward transformation and you are reading from Kinesis to begin
> with, it might be an easier option to just do the transformation in Kinesis.
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan <
> sabarish.sasidharan@manthan.com> wrote:
>
> Whatever you write in bolts would be the logic you want to apply on your
> events. In Spark, that logic would be coded in map() or similar such
> transformations and/or actions. Spark doesn't enforce a structure for
> capturing your processing logic like Storm does.
>
> Regards
> Sab
>
> Probably overloading the question a bit.
>
> In Storm, Bolts have the functionality of getting triggered on events. Is
> that kind of functionality possible with Spark streaming? During each phase
> of the data processing, the transformed data is stored to the database and
> this transformed data should then be sent to a new pipeline for further
> processing
>
> How can this be achieved using Spark?
>
>
>
> On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast <
> sparkenthusiast@yahoo.in> wrote:
>
>   I have a use-case where a stream of Incoming events have to be
> aggregated and joined to create Complex events. The aggregation will have
> to happen at an interval of 1 minute (or less).
>
>
>
> The pipeline is :
>
>                                   send events
>              enrich event
>
> Upstream services -------------------> KAFKA ---------> event Stream
> Processor ------------> Complex Event Processor ------------> Elastic
> Search.
>
>
>
> From what I understand, Storm will make a very good ESP and Spark
> Streaming will make a good CEP.
>
>
>
> But, we are also evaluating Storm with Trident.
>
>
>
> How does Spark Streaming compare with Storm with Trident?
>
>
>
> Sridhar Chellappa
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Wednesday, 17 June 2015 10:02 AM, ayan guha <guha.ayan@gmail.com>
> wrote:
>
>
>
> I have a similar scenario where we need to bring data from kinesis to
> hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
> be required but that's regardless of the tool so we will be writing that
> piece in Java pojo.
>
> All env is on aws. Hbase is on a long running EMR and kinesis on a
> separate cluster.
>
> TIA.
> Best
> Ayan
>
> On 17 Jun 2015 12:13, "Will Briggs" <wrbriggs@gmail.com> wrote:
>
> The programming models for the two frameworks are conceptually rather
> different; I haven't worked with Storm for quite some time, but based on my
> old experience with it, I would equate Spark Streaming more with Storm's
> Trident API, rather than with the raw Bolt API. Even then, there are
> significant differences, but it's a bit closer.
>
> If you can share your use case, we might be able to provide better
> guidance.
>
> Regards,
> Will
>
> On June 16, 2015, at 9:46 PM, asoni.learn@gmail.com wrote:
>
> Hi All,
>
> I am evaluating spark VS storm ( spark streaming  ) and i am not able to
> see what is equivalent of Bolt in storm inside spark.
>
> Any help will be appreciated on this ?
>
> Thanks ,
> Ashish
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>
>
>
>
>
>  The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments. WARNING: Computer viruses can be
> transmitted via email. The recipient should check this email and any
> attachments for the presence of viruses. The company accepts no liability
> for any damage caused by any virus transmitted by this email.
> www.wipro.com
>
>
>
> The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments. WARNING: Computer viruses can be
> transmitted via email. The recipient should check this email and any
> attachments for the presence of viruses. The company accepts no liability
> for any damage caused by any virus transmitted by this email.
> www.wipro.com
>
>
>
>
>
>
>
>

Mime
View raw message