From user-return-36003-apmail-spark-user-archive=spark.apache.org@spark.apache.org Fri Jun 19 21:54:00 2015 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 98F9F188C4 for ; Fri, 19 Jun 2015 21:54:00 +0000 (UTC) Received: (qmail 94027 invoked by uid 500); 19 Jun 2015 21:53:56 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 93937 invoked by uid 500); 19 Jun 2015 21:53:56 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 93927 invoked by uid 99); 19 Jun 2015 21:53:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 21:53:56 +0000 X-ASF-Spam-Status: No, hits=1.8 required=5.0 tests=HTML_FONT_FACE_BAD,HTML_FONT_LOW_CONTRAST,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of tdas@databricks.com designates 74.125.82.47 as permitted sender) Received: from [74.125.82.47] (HELO mail-wg0-f47.google.com) (74.125.82.47) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 21:51:39 +0000 Received: by wgez8 with SMTP id z8so99412331wge.0 for ; Fri, 19 Jun 2015 14:53:27 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-type; bh=Ye2MWefIUih3B0cN17s/8ZU5WzlqL/lJ72hT/HHzhC8=; b=R2AQHqplu91ZUFGgB7Xi+1HzK7dvWVeI11l7Z/f0ZCXIa51pPmRiDEcReF28XGvt2V 8CwvQ9C4XRxVw4i1aCDKoRnQZa0aUYXAlsCUlgq1qNTbYk5hMCbJD3St6Fj7XQAgqD80 qCNOGkrgNVkRyypG3R9/otQvW7mQIaUyOXnFe94J/AoA6Jp8yCtTKxhvv8yWcCmBIcKv rs65cpP/9E6HZ7YZMkVEKJwAJHDVmKMOqFIhOtVumgjmeoRqV+ikFbqI2+Dtg6pAQEpH Nub2cUqXJkUE9bkBqmsASlyUXj2lEZPI3/u5Ijb7qy95FF+gknBMQ2No3Ym6XwnmB0Cy Rh5Q== X-Gm-Message-State: ALoCoQna7HShCeOkBGA/XQ8nJAFkvxpJkRNaqcCkLn+XNOGCCU+Ou91XLHB+u5h4uXNYinGZPToG X-Received: by 10.180.36.103 with SMTP id p7mr10336478wij.88.1434750807501; Fri, 19 Jun 2015 14:53:27 -0700 (PDT) MIME-Version: 1.0 Received: by 10.194.59.39 with HTTP; Fri, 19 Jun 2015 14:52:56 -0700 (PDT) In-Reply-To: References: <2EB23AF5EEEA2140946B8F292EB2EB9F22F5C3@QS-PEK-DC1.qilinsoftcorp.qilinsoft.com> <2015061919105782254521@163.com> From: Tathagata Das Date: Fri, 19 Jun 2015 14:52:56 -0700 Message-ID: Subject: Re: RE: Spark or Storm To: Cody Koeninger Cc: Ashish Soni , "bit1129@163.com" , Haopu Wang , "eshioji@gmail.com" , "prajod.vettiyattil@wipro.com" , "jrpilat@gmail.com" , "wrbriggs@gmail.com" , ayan guha , user , "sateesh.kavuri@gmail.com" , "sparkenthusiast@yahoo.in" , "sabarish.sasidharan@manthan.com" Content-Type: multipart/alternative; boundary=e89a8f502c6a38f7ca0518e5f471 X-Virus-Checked: Checked by ClamAV on apache.org --e89a8f502c6a38f7ca0518e5f471 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I agree with Cody. Its pretty hard for any framework to provide in built support for that since the semantics completely depends on what data store you want to use it with. Providing interfaces does help a little, but even with those interface, the user still has to do most of the heavy lifting; the user has to understand what is actually going on AND implement all the needed code to ensure unique ID, and the data are atomically updated, according to the capability and APIs provided by the data store. On Fri, Jun 19, 2015 at 7:45 AM, Cody Koeninger wrote: > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#faul= t-tolerance-semantics > > "semantics of output operations" section > > Is this really not clear? > > As for the general tone of "why doesn't the framework do it for you"... i= n > my opinion, this is essential complexity for delivery semantics in a > distributed system, not incidental complexity. You need to actually > understand and be responsible for what's going on, unless you're talking > about very narrow use cases (i.e. outputting to a known datastore with > known semantics and schema) > > On Fri, Jun 19, 2015 at 7:26 AM, Ashish Soni > wrote: > >> My understanding for exactly once semantics is it is handled into the >> framework itself but it is not very clear from the documentation , I >> believe documentation needs to be updated with a simple example so that = it >> is clear to the end user , This is very critical to decide when some one= is >> evaluating the framework and does not have enough time to validate all t= he >> use cases but to relay on the documentation. >> >> Ashish >> >> On Fri, Jun 19, 2015 at 7:10 AM, bit1129@163.com wrote= : >> >>> >>> I think your observation is correct, you have to take care of these >>> replayed data at your end,eg,each message has a unique id or something = else. >>> >>> I am using "I think" in the above sentense, because I am not sure and I >>> also have a related question: >>> I am wonderring how direct stream + kakfa is implemented when the Drive= r >>> is down and restarted, will it always first replay the checkpointed fai= led >>> batch or will it honor Kafka's offset reset policy(auto.offset.reset). = If >>> it honors the reset policy and it is set as "smallest", then it is the = at >>> least once semantics; if it set "largest", then it will be at most onc= e >>> semantics? >>> >>> >>> ------------------------------ >>> bit1129@163.com >>> >>> >>> *From:* Haopu Wang >>> *Date:* 2015-06-19 18:47 >>> *To:* Enno Shioji ; Tathagata Das >>> >>> *CC:* prajod.vettiyattil@wipro.com; Cody Koeninger = ; >>> bit1129@163.com; Jordan Pilat ; Will Briggs >>> ; Ashish Soni ; ayan guha >>> ; user@spark.apache.org; Sateesh Kavuri >>> ; Spark Enthusiast = ; >>> Sabarish Sasidharan >>> *Subject:* RE: RE: Spark or Storm >>> >>> My question is not directly related: about the "exactly-once semantic", >>> the document (copied below) said spark streaming gives exactly-once >>> semantic, but actually from my test result, with check-point enabled, t= he >>> application always re-process the files in last batch after gracefully >>> restart. >>> >>> >>> >>> =3D=3D=3D=3D=3D=3D >>> *Semantics of Received Data* >>> >>> Different input sources provide different guarantees, ranging from *at-= least >>> once* to *exactly once*. Read for more details. >>> *With Files* >>> >>> If all of the input data is already present in a fault-tolerant files >>> system like HDFS, Spark Streaming can always recover from any failure a= nd >>> process all the data. This gives *exactly-once* semantics, that all the >>> data will be processed exactly once no matter what fails. >>> >>> >>> >>> >>> ------------------------------ >>> >>> *From:* Enno Shioji [mailto:eshioji@gmail.com] >>> *Sent:* Friday, June 19, 2015 5:29 PM >>> *To:* Tathagata Das >>> *Cc:* prajod.vettiyattil@wipro.com; Cody Koeninger; bit1129@163.com; >>> Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.or= g; >>> Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan >>> *Subject:* Re: RE: Spark or Storm >>> >>> >>> >>> Fair enough, on second thought, just saying that it should be idempoten= t >>> is indeed more confusing. >>> >>> >>> >>> I guess the crux of the confusion comes from the fact that people tend >>> to assume the work you described (store batch id and skip etc.) is hand= led >>> by the framework, perhaps partly because Storm Trident does handle it (= you >>> just need to let Storm know if the output operation has succeeded or no= t, >>> and it handles the batch id storing & skipping business). Whenever I >>> explain people that one needs to do this additional work you described = to >>> get end-to-end exactly-once semantics, it usually takes a while to conv= ince >>> them. In my limited experience, they tend to interpret "transactional" = in >>> that sentence to mean that you just have to write to a transactional >>> storage like ACID RDB. Pointing them to "Semantics of output operations= " is >>> usually sufficient though. >>> >>> >>> >>> Maybe others like @Ashish can weigh on this; did you interpret it in >>> this way? >>> >>> >>> >>> What if we change the statement into: >>> >>> "end-to-end exactly-once semantics (if your updates to downstream >>> systems are idempotent or transactional). To learn how to make your upd= ates >>> idempotent or transactional, see the "Semantics of output operations" >>> section in this chapter >>> >>> " >>> >>> >>> >>> That way, it's clear that it's not sufficient to merely write to a >>> "transactional storage" like ACID store. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das >>> wrote: >>> >>> If the current documentation is confusing, we can definitely improve th= e >>> documentation. However, I dont not understand why is the term >>> "transactional" confusing. If your output operation has to add 5, then = the >>> user has to implement the following mechanism >>> >>> >>> >>> 1. If the unique id of the batch of data is already present in the >>> store, then skip the update >>> >>> 2. Otherwise atomically do both, the update operation as well as store >>> the unique id of the batch. This is pretty much the definition of a >>> transaction. The user has to be aware of the transactional semantics of= the >>> data store while implementing this functionality. >>> >>> >>> >>> You CAN argue that this effective makes the whole updating sort-a >>> idempotent, as even if you try doing it multiple times, it will update = only >>> once. But that is not what is generally considered as idempotent. Writi= ng a >>> fixed count, not an increment, is usually what is called idempotent. An= d so >>> just mentioning that the output operation must be idempotent is, in my >>> opinion, more confusing. >>> >>> >>> >>> To take a page out of the Storm / Trident guide, even they call this >>> exact conditional updating of Trident State as "transactional" >>> operation. See "transactional spout" in the Trident State guide - >>> https://storm.apache.org/documentation/Trident-state >>> >>> >>> >>> In the end, I am totally open the suggestions and PRs on how to make th= e >>> programming guide easier to understand. :) >>> >>> >>> >>> TD >>> >>> >>> >>> On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji wrote= : >>> >>> Tbh I find the doc around this a bit confusing. If it says "end-to-end >>> exactly-once semantics (if your updates to downstream systems are >>> idempotent or transactional)", I think most people will interpret it >>> that as long as you use a storage which has atomicity (like MySQL/Postg= res >>> etc.), a successful output operation for a given batch (let's say "+ 5"= ) is >>> going to be issued exactly-once against the storage. >>> >>> >>> >>> However, as I understand it that's not what this statement means. What >>> it is saying is, it will always issue "+5" and never, say "+6", because= it >>> makes sure a message is processed exactly-once internally. However, it >>> *may* issue "+5" more than once for a given batch, and it is up to the >>> developer to deal with this by either making the output operation >>> idempotent (e.g. "set 5"), or "transactional" (e.g. keep track of batch= IDs >>> and skip already applied batches etc.). >>> >>> >>> >>> I wonder if it makes more sense to drop "or transactional" from the >>> statement, because if you think about it, ultimately what you are asked= to >>> do is to make the writes idempotent even with the "transactional" appro= ach, >>> & "transactional" is a bit loaded and would be prone to lead to >>> misunderstandings (even though in fairness, if you read the fault toler= ance >>> chapter it explicitly explains it). >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Jun 19, 2015 at 2:56 AM, wrote: >>> >>> More details on the Direct API of Spark 1.3 is at the databricks blog: >>> https://databricks.com/blog/2015/03/30/improvements-to-kafka-integratio= n-of-spark-streaming.html >>> >>> >>> >>> Note the use of checkpoints to persist the Kafka offsets in Spark >>> Streaming itself, and not in zookeeper. >>> >>> >>> >>> Also this statement:=E2=80=9D.. This allows one to build a Spark Stream= ing + >>> Kafka pipelines with end-to-end exactly-once semantics (if your updates= to >>> downstream systems are idempotent or transactional).=E2=80=9D >>> >>> >>> >>> >>> >>> *From:* Cody Koeninger [mailto:cody@koeninger.org] >>> *Sent:* 18 June 2015 19:38 >>> *To:* bit1129@163.com >>> *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpilat@gmail.com; >>> eshioji@gmail.com; wrbriggs@gmail.com; asoni.learn@gmail.com; ayan >>> guha; user; sateesh.kavuri@gmail.com; sparkenthusiast@yahoo.in; >>> sabarish.sasidharan@manthan.com >>> *Subject:* Re: RE: Spark or Storm >>> >>> >>> >>> That general description is accurate, but not really a specific issue o= f >>> the direct steam. It applies to anything consuming from kafka (or, as >>> Matei already said, any streaming system really). You can't have exact= ly >>> once semantics, unless you know something more about how you're storing >>> results. >>> >>> >>> >>> For "some unique id", topicpartition and offset is usually the obvious >>> choice, which is why it's important that the direct stream gives you ac= cess >>> to the offsets. >>> >>> >>> >>> See https://github.com/koeninger/kafka-exactly-once for more info >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Jun 18, 2015 at 6:47 AM, bit1129@163.com >>> wrote: >>> >>> I am wondering how direct stream api ensures end-to-end exactly once >>> semantics >>> >>> >>> >>> I think there are two things involved: >>> >>> 1. From the spark streaming end, the driver will replay the Offset rang= e >>> when it's down and restarted,which means that the new tasks will proces= s >>> some already processed data. >>> >>> 2. From the user end, since tasks may process already processed data, >>> user end should detect that some data has already been processed,eg, >>> >>> use some unique ID. >>> >>> >>> >>> Not sure if I have understood correctly. >>> >>> >>> >>> >>> ------------------------------ >>> >>> bit1129@163.com >>> >>> >>> >>> *From:* prajod.vettiyattil@wipro.com >>> >>> *Date:* 2015-06-18 16:56 >>> >>> *To:* jrpilat@gmail.com; eshioji@gmail.com >>> >>> *CC:* wrbriggs@gmail.com; asoni.learn@gmail.com; guha.ayan@gmail.com; >>> user@spark.apache.org; sateesh.kavuri@gmail.com; >>> sparkenthusiast@yahoo.in; sabarish.sasidharan@manthan.com >>> >>> *Subject:* RE: Spark or Storm >>> >>> >>not being able to read from Kafka using multiple nodes >>> >>> >>> >>> > Kafka is plenty capable of doing this.. >>> >>> >>> >>> I faced the same issue before Spark 1.3 was released. >>> >>> >>> >>> The issue was not with Kafka, but with Spark Streaming=E2=80=99s Kafka >>> connector. Before Spark 1.3.0 release one Spark worker would get all th= e >>> streamed messages. We had to re-partition to distribute the processing. >>> >>> >>> >>> From Spark 1.3.0 release the Spark Direct API for Kafka supported >>> parallel reads from Kafka streamed to Spark workers. See the =E2=80=9CA= pproach 2: >>> Direct Approach=E2=80=9D in this page: >>> http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. >>> Note that is also mentions zero data loss and exactly once semantics fo= r >>> kafka integration. >>> >>> >>> >>> >>> >>> Prajod >>> >>> >>> >>> *From:* Jordan Pilat [mailto:jrpilat@gmail.com] >>> *Sent:* 18 June 2015 03:57 >>> *To:* Enno Shioji >>> *Cc:* Will Briggs; asoni.learn@gmail.com; ayan guha; user; Sateesh >>> Kavuri; Spark Enthusiast; Sabarish Sasidharan >>> *Subject:* Re: Spark or Storm >>> >>> >>> >>> >not being able to read from Kafka using multiple nodes >>> >>> Kafka is plenty capable of doing this, by clustering together multiple >>> consumer instances into a consumer group. >>> If your topic is sufficiently partitioned, the consumer group can >>> consume the topic in a parallelized fashion. >>> If it isn't, you still have the fault tolerance associated with >>> clustering the consumers. >>> >>> OK >>> JRP >>> >>> On Jun 17, 2015 1:27 AM, "Enno Shioji" wrote: >>> >>> We've evaluated Spark Streaming vs. Storm and ended up sticking with >>> Storm. >>> >>> >>> >>> Some of the important draw backs are: >>> >>> Spark has no back pressure (receiver rate limit can alleviate this to a >>> certain point, but it's far from ideal) >>> >>> There is also no exactly-once semantics. (updateStateByKey can achieve >>> this semantics, but is not practical if you have any significant amount= of >>> state because it does so by dumping the entire state on every checkpoin= ting) >>> >>> >>> >>> There are also some minor drawbacks that I'm sure will be fixed quickly= , >>> like no task timeout, not being able to read from Kafka using multiple >>> nodes, data loss hazard with Kafka. >>> >>> >>> >>> It's also not possible to attain very low latency in Spark, if that's >>> what you need. >>> >>> >>> >>> The pos for Spark is the concise and IMO more intuitive syntax, >>> especially if you compare it with Storm's Java API. >>> >>> >>> >>> I admit I might be a bit biased towards Storm tho as I'm more familiar >>> with it. >>> >>> >>> >>> Also, you can do some processing with Kinesis. If all you need to do is >>> straight forward transformation and you are reading from Kinesis to beg= in >>> with, it might be an easier option to just do the transformation in Kin= esis. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan < >>> sabarish.sasidharan@manthan.com> wrote: >>> >>> Whatever you write in bolts would be the logic you want to apply on you= r >>> events. In Spark, that logic would be coded in map() or similar such >>> transformations and/or actions. Spark doesn't enforce a structure for >>> capturing your processing logic like Storm does. >>> >>> Regards >>> Sab >>> >>> Probably overloading the question a bit. >>> >>> In Storm, Bolts have the functionality of getting triggered on events. >>> Is that kind of functionality possible with Spark streaming? During eac= h >>> phase of the data processing, the transformed data is stored to the >>> database and this transformed data should then be sent to a new pipelin= e >>> for further processing >>> >>> How can this be achieved using Spark? >>> >>> >>> >>> On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast < >>> sparkenthusiast@yahoo.in> wrote: >>> >>> I have a use-case where a stream of Incoming events have to be >>> aggregated and joined to create Complex events. The aggregation will ha= ve >>> to happen at an interval of 1 minute (or less). >>> >>> >>> >>> The pipeline is : >>> >>> send events >>> enrich event >>> >>> Upstream services -------------------> KAFKA ---------> event Stream >>> Processor ------------> Complex Event Processor ------------> Elastic >>> Search. >>> >>> >>> >>> From what I understand, Storm will make a very good ESP and Spark >>> Streaming will make a good CEP. >>> >>> >>> >>> But, we are also evaluating Storm with Trident. >>> >>> >>> >>> How does Spark Streaming compare with Storm with Trident? >>> >>> >>> >>> Sridhar Chellappa >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wednesday, 17 June 2015 10:02 AM, ayan guha >>> wrote: >>> >>> >>> >>> I have a similar scenario where we need to bring data from kinesis to >>> hbase. Data volecity is 20k per 10 mins. Little manipulation of data wi= ll >>> be required but that's regardless of the tool so we will be writing tha= t >>> piece in Java pojo. >>> >>> All env is on aws. Hbase is on a long running EMR and kinesis on a >>> separate cluster. >>> >>> TIA. >>> Best >>> Ayan >>> >>> On 17 Jun 2015 12:13, "Will Briggs" wrote: >>> >>> The programming models for the two frameworks are conceptually rather >>> different; I haven't worked with Storm for quite some time, but based o= n my >>> old experience with it, I would equate Spark Streaming more with Storm'= s >>> Trident API, rather than with the raw Bolt API. Even then, there are >>> significant differences, but it's a bit closer. >>> >>> If you can share your use case, we might be able to provide better >>> guidance. >>> >>> Regards, >>> Will >>> >>> On June 16, 2015, at 9:46 PM, asoni.learn@gmail.com wrote: >>> >>> Hi All, >>> >>> I am evaluating spark VS storm ( spark streaming ) and i am not able t= o >>> see what is equivalent of Bolt in storm inside spark. >>> >>> Any help will be appreciated on this ? >>> >>> Thanks , >>> Ashish >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org >>> For additional commands, e-mail: user-help@spark.apache.org >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org >>> For additional commands, e-mail: user-help@spark.apache.org >>> >>> >>> >>> >>> >>> >>> >>> The information contained in this electronic message and any >>> attachments to this message are intended for the exclusive use of the >>> addressee(s) and may contain proprietary, confidential or privileged >>> information. If you are not the intended recipient, you should not >>> disseminate, distribute or copy this e-mail. Please notify the sender >>> immediately and destroy all copies of this message and any attachments. >>> WARNING: Computer viruses can be transmitted via email. The recipient >>> should check this email and any attachments for the presence of viruses= . >>> The company accepts no liability for any damage caused by any virus >>> transmitted by this email. www.wipro.com >>> >>> >>> >>> The information contained in this electronic message and any attachment= s >>> to this message are intended for the exclusive use of the addressee(s) = and >>> may contain proprietary, confidential or privileged information. If you= are >>> not the intended recipient, you should not disseminate, distribute or c= opy >>> this e-mail. Please notify the sender immediately and destroy all copie= s of >>> this message and any attachments. WARNING: Computer viruses can be >>> transmitted via email. The recipient should check this email and any >>> attachments for the presence of viruses. The company accepts no liabili= ty >>> for any damage caused by any virus transmitted by this email. >>> www.wipro.com >>> >>> >>> >>> >>> >>> >>> >>> >> > --e89a8f502c6a38f7ca0518e5f471 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I agree with Cody. Its pretty hard for any framework to pr= ovide in built support for that since the semantics completely=C2=A0depends= on what data store you want to use it with. Providing interfaces does help= a little, but even with those interface, the user still has to do most of = the heavy=C2=A0lifting; the user has to understand what is actually going o= n AND implement all the needed code to ensure unique ID, and the data are a= tomically updated, according to the capability and APIs provided by the dat= a store.=C2=A0

On Fri, Jun 19, 2015 at 7:45 AM, Cody Koeninger <cody@koeninger.org> wrote:
"semantics of output operations" section
<= div>
Is this really not clear?

As fo= r the general tone of "why doesn't the framework do it for you&quo= t;... in my opinion, this is essential complexity for delivery semantics in= a distributed system, not incidental complexity.=C2=A0 You need to actuall= y understand and be responsible for what's going on, unless you're = talking about very narrow use cases (i.e. outputting to a known datastore w= ith known semantics and schema)

On Fri, J= un 19, 2015 at 7:26 AM, Ashish Soni <asoni.learn@gmail.com> wrote:
My understand= ing for exactly once semantics is it is handled into the framework itself b= ut it is not very clear from the documentation , I believe documentation ne= eds to be updated with a simple example so that it is clear to the end user= , This is very critical to decide when some one is evaluating the framewor= k and does not have enough time to validate all the use cases but to relay = on the documentation.

Ashish=C2=A0
=

On Fri, Jun 19, 2= 015 at 7:10 AM, bit112= 9@163.com <bit1129@163.com> wrote:

I think your observation is correct, you have to take care of these re= played data at your end,eg,each message has a unique id or something else.<= /div>

I am using "I think" in the above senten= se, because I am not sure and I also have a related question:
I am = wonderring how direct stream + kakfa is implemented when the Driver is down= and restarted, will it always first replay the checkpointed failed batch o= r will it honor Kafka's offset reset policy(auto.offset.reset). If it h= onors the reset policy and it is set as "smallest", then it is th= e at least once semantics; =C2=A0if it set "largest", then it wil= l be at most once semantics?

=C2=A0
From:=C2=A0Haopu Wang
Date:=C2=A02015-06-19=C2=A018:47
To:=C2=A0Enno Shioji; Tathagata Das
CC:=C2=A0prajod.vettiyattil@wipro.com; Cody Koeninger; bit1129@163.com; Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@sp= ark.apache.org; Sateesh Kavuri; <= a href=3D"mailto:sparkenthusiast@yahoo.in" style=3D"color:blue;text-decorat= ion:underline" target=3D"_blank">Spark Enthusiast; Sabarish Sasidharan
Subject:=C2= =A0RE: RE: Spark or Storm

My question is not directly related: about the "exactly-once semantic", the document= (copied below) said spark streaming gives exactly-once semantic, but actually from = my test result, with check-point enabled, the application always re-process th= e files in last batch after gracefully restart.

=C2=A0

=3D=3D=3D=3D=3D=3D

Semantics of Received Data

Different inpu= t sources provide different guarantees, ranging from=C2=A0= at-least once<= span>=C2=A0to=C2=A0exactly once. Read for more details.

With Files

If all of the = input data is already present in a fault-tolerant files system like HDFS, Spark Streaming can alw= ays recover from any failure and process all the data. This gives=C2=A0= exactly-once=C2=A0semantics, that all t= he data will be processed exactly once no matter what fails.

=C2=A0

=C2=A0


F= rom: Enno Shioji [mailto:= eshioji@gmail.com]
Sent: Friday, June 19, 2015 = 5:29 PM
To: Tathagata Das
Cc: prajod.vettiyattil@wipro.com; Cody Koeninger; bit112= 9@163.com; Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark= .apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
Subject: Re: RE: Spark or St= orm

=C2=A0

Fair enough, on second thought, just saying that it should be idempotent is inde= ed more confusing.

=C2=A0

I guess the crux of the confusion comes from the fact that people tend to ass= ume the work you described (store batch id and skip etc.) is handled by the framework, perhaps partly because Storm Trident does handle it (you just ne= ed to let Storm know if the output operation has succeeded or not, and it hand= les the batch id storing & skipping business). Whenever I explain people th= at one needs to do this additional work you described to get end-to-end exactly-once semantics, it usually takes a while to convince them. In my limited experience, they tend to interpret "transactional" in tha= t sentence to mean that you just have to write to a transactional storage lik= e ACID RDB. Pointing them to "Semantics of output operations"=C2=A0= is usually sufficient though.

=C2=A0

Maybe others like @Ashish can weigh on this; did you interpret it in this way?=

=C2=A0

What if we change the statement into:

"end-to-end exactly-once s= emantics (if your updates to downstream systems are idempotent or transactional). To learn ho= w to make your updates idempotent or transactional, see the "Semantics of output operations" section in this chapter"

=C2=A0

That way, it's clear that it's not sufficient to merely write to a "transactional storage" like ACID store.

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0

On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das <tdas@databricks.com> wrote:

If the current documentation is confusing, we can definitely improve the documentation. However, I dont not understand why is the term "transactional" confusing. If your output operation has to add 5, then the user has to implement the following mechanism=

=C2=A0

1. If the unique id of the batch of data is already present in the store, then skip the update

2. Otherwise atomically do both, the update operation as well as store the uni= que id of the batch. This is pretty much the definition of a transaction. The u= ser has to be aware of the transactional semantics of the data store while implementing this functionality.=C2=A0

=C2=A0

You CAN argue that this effective makes the whole updating sort-a idempotent, a= s even if you try doing it multiple times, it will update only once. But that= is not what is generally considered as idempotent. Writing a fixed count, not = an increment, is usually what is called idempotent. And so just mentioning tha= t the output operation must be idempotent is, in my opinion, more confusing.<= u>

=C2=A0

To take a page out of the Storm / Trident guide, even they call this exact conditional updating of Trident State as "transactional" operation. See "transactional spout" in= the Trident State guide -=C2=A0https://storm.apache.org/d= ocumentation/Trident-state

=C2=A0

In the end, I am totally open the suggestions and PRs on how to make the programming guide easier to understand. :)

=C2=A0

TD

=C2=A0

On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji <e= shioji@gmail.com> wrote:

Tbh I find the doc around this a bit confusing. If it says "= end-to-end exact= ly-once semantics (if your updates to downstream systems are idempotent or transactional)", I think most people will interpret it that as= long as you use a storage which has atomicity (like MySQL/Postgres etc.), a successful output operation for a given batch (let's say "+ 5") is going= to be issued exactly-once against the storage.

=C2=A0

However, as I understand it that's not what this statement means. What it is say= ing is, it will always issue "+5" and never, say "+6", because = it makes sure a message is processed exactly-once internally. However, it *may= * issue "+5" more than once for a given batch, and it is up to the developer to deal with this by either making the output operation idempoten= t (e.g. "set 5"), or "transactional" (e.g. keep track of batch IDs and skip already applied batches etc.).

=C2=A0

I wonder if it makes more sense to drop "or transactional" from the statement, because if you think about it,= ultimately what you are asked to do is to make the writes idempotent even with the "transactional" approach, & "transactional" is a bi= t loaded and would be prone to lead to misunderstandings (even though in fairness, if you read the fault tolerance chapter it explicitly explains it= ).

=C2=A0

=C2=A0

=C2=A0

On Fri, Jun 19, 2015 at 2:56 AM, <pra= jod.vettiyattil@wipro.com> wrote:

More details on the Direct API of Spark 1.3 is at the databricks blog: https://databricks.com/b= log/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

=C2=A0<= /p>

Note the use of checkpoints to persist the Kafka offsets in Spark Streaming itself, and not in zookeeper.

=C2=A0<= /p>

Also this statement:=E2=80=9D.. Thi= s allows one to build a Spark Streaming + Kafka pipelines with end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional).=E2=80=9D

=C2=A0<= /p>

=C2=A0<= /p>

= From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: 18 June 2015 19:38
To: bit1129@163.com
Cc: Prajod S Vettiyattil (WT= 01 - BAS); jrpilat@gmail.com; eshioji@gmail.com; wrbriggs@gmail.com; asoni.learn@gma= il.com; ayan guha; user; sateesh.kavuri@gmail.com= ; sparkenthusiast@yahoo.in; sabarish.sasidharan@manthan.com
Subject: Re: RE: Spark or St= orm

=C2=A0

=C2=A0

On Thu, Jun 18, 2015 at 6:47 AM, bit1129@163.com <bit1129@163.com> wrote:

I am wondering how direct stream api ensures end-to-end exactly once semantics

=C2=A0

I think there are two things involved:

1. From the spark streaming end, the driver will replay the Offset range when it's down a= nd restarted,which means that the new tasks will process some already processe= d data.

2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg,

use some unique ID.=

=C2=A0

Not sure if I have understood correctly.

=C2=A0

=C2=A0

=

=C2=A0

Date:=C2=A02015-06-18=C2=A016:56

CC:=C2=A0wrbriggs@gmail.com; <= a href=3D"mailto:asoni.learn@gmail.com" style=3D"color:blue;text-decoration= :underline" target=3D"_blank">asoni.learn@gmail.com; guha.ayan@gmail.com; user@sp= ark.apache.org; sateesh.kavuri@gmail.= com; sparkenthusiast@yahoo.in; sabarish.sasidharan@manthan.com

Subject:=C2=A0RE: Spark or Storm

>>not being able to read from Kafka using multiple nodes

=C2=A0<= /p>

> Kafka is plenty capable of doing this..

=C2=A0<= /p>

I faced the same issue before Spark 1.3 was released.

=C2=A0<= /p>

The issue was not with Kafka, but with Spark Streaming=E2=80=99s Kafka connector. Before Spark 1.3.0 release one Spark w= orker would get all the streamed messages. We had to re-partition to distribute the processing.

=C2=A0<= /p>

From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka streamed to Spark workers= . See the =E2=80=9CApproach 2: Direct Approach=E2=80=9D in this page: http://spark= .apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that is also mentions zero data loss and exactly once semantics for ka= fka integration.

=C2=A0<= /p>

=C2=A0<= /p>

Prajod<= /p>

=C2=A0<= /p>

= From: Jordan Pilat [mailto:jrpilat@gmail.com]
Sent: 18 June 2015 03:57
To: Enno Shioji
Cc: Will Briggs; asoni.learn@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
Subject: Re: Spark or Storm<= /span>

=C2=A0

>not being able to read from Kafka using multiple nodes

Kafka is plenty capable of doing this,=C2=A0 by clustering together multiple consume= r instances into a consumer group.
If your topic is sufficiently partitioned, the consumer group can consume t= he topic in a parallelized fashion.
If it isn't, you still have the fault tolerance associated with cluster= ing the consumers.

OK
JRP

On Jun 17, 2015 1:27 AM, "Enno Shioji" <eshioji@gmail.c= om> wrote:

We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.<= /font>

=C2=A0

Some of the important draw backs are:

Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it= 's far from ideal)

There is also no exactly-once=C2=A0semantics. (updateStateByKey can achieve this semantics, = but is not practical if you have any significant amount of state because it doe= s so by dumping the entire state on every checkpointing)

=C2=A0

There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeo= ut, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka.

=C2=A0

It's also not possible to attain very low latency in Spark, if that's what you need.=

=C2=A0

The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it wit= h Storm's Java API.

=C2=A0

I admit I might be a bit biased towards Storm tho as I'm more familiar with it.

=C2=A0

Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation and you are reading from Kinesis to begin with, it might be = an easier option to just do the transformation in Kinesis.

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0

On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan <sabarish.sasidharan@manthan.com> wrote:

Whatever you write in bolts would be the logic you want to apply on your events. In Spar= k, that logic would be coded in map() or similar such=C2=A0 transformations an= d/or actions. Spark doesn't enforce a structure for capturing your processin= g logic like Storm does.

Regards
Sab

Probably overloading the question a bit.

In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functiona= lity possible with Spark streaming? During each phase of the data processing, th= e transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing<= /font>

How can this be achieved using Spark?

=C2=A0

On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast <sparken= thusiast@yahoo.in> wrote:

I have= a use-case where a stream of Incoming events have to be aggregated and joined to creat= e Complex events. The aggregation will have to happen at an interval of 1 min= ute (or less).

=C2=A0=

The pi= peline is :

=C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 send events =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0enrich event<= u>

Upstre= am services -------------------> KAFKA ---------> event Stream Processor ------------> Complex Event Processor ------------> Elastic Search.

=C2=A0=

From w= hat I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP.

=C2=A0=

But, w= e are also evaluating Storm with Trident.=

=C2=A0=

How do= es Spark Streaming compare with Storm with Trident?

=C2=A0=

Sridha= r Chellappa

=C2=A0=

=C2=A0=

=C2=A0=

=C2=A0=

=C2=A0=

=C2=A0=

On Wednesday, = 17 June 2015 10:02 AM, ayan guha <guha.ayan@gmail= .com> wrote:=

=C2=A0=

I have= a similar scenario where we need to bring data from kinesis to hbase. Data volecity i= s 20k per 10 mins. Little manipulation of data will be required but that'= s regardless of the tool so we will be writing that piece in Java pojo.

All en= v is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.

TIA. Best
Ayan

On 17 = Jun 2015 12:13, "Will Briggs" <wrbriggs@gma= il.com> wrote:

The pr= ogramming models for the two frameworks are conceptually rather different; I haven= 9;t worked with Storm for quite some time, but based on my old experience with = it, I would equate Spark Streaming more with Storm's Trident API, rather th= an with the raw Bolt API. Even then, there are significant differences, but it'= s a bit closer.

If you can share your use case, we might be able to provide better guidance= .

Regards,
Will

On June 16, 2015, at 9:46 PM, asoni.learn@gm= ail.com wrote:

Hi All,

I am evaluating spark VS storm ( spark streaming=C2=A0 ) and i am not able = to see what is equivalent of Bolt in storm inside spark.

Any help will be appreciated on this ?

Thanks ,
Ashish
---------------------------------------------------------------------
To unsubscribe, e-mail: user-uns= ubscribe@spark.apache.org
For additional commands, e-mail: user-h= elp@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-uns= ubscribe@spark.apache.org
For additional commands, e-mail: user-h= elp@spark.apache.org

=C2=A0=

=C2=A0

=C2=A0

The information contained in this electronic message and any attachments to this message ar= e intended for the exclusive use of the addressee(s) and may contain propriet= ary, confidential or privileged information. If you are not the intended recipie= nt, you should not disseminate, distribute or copy this e-mail. Please notify t= he sender immediately and destroy all copies of this message and any attachmen= ts. WARNING: Computer viruses can be transmitted via email. The recipient shoul= d check this email and any attachments for the presence of viruses. The compa= ny accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com

=C2=A0

The information contained in this electronic message and any attachments to thi= s message are intended for the exclusive use of the addressee(s) and may cont= ain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-m= ail. Please notify the sender immediately and destroy all copies of this message= and any attachments. WARNING: Computer viruses can be transmitted via email. Th= e recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any viru= s transmitted by this email. www.wipro.com

=C2=A0

=C2=A0

=C2=A0




--e89a8f502c6a38f7ca0518e5f471--