spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "bit1129@163.com" <bit1...@163.com>
Subject Re: RE: Spark or Storm
Date Fri, 19 Jun 2015 11:10:58 GMT

I think your observation is correct, you have to take care of these replayed data at your
end,eg,each message has a unique id or something else.

I am using "I think" in the above sentense, because I am not sure and I also have a related
question:
I am wonderring how direct stream + kakfa is implemented when the Driver is down and restarted,
will it always first replay the checkpointed failed batch or will it honor Kafka's offset
reset policy(auto.offset.reset). If it honors the reset policy and it is set as "smallest",
then it is the at least once semantics;  if it set "largest", then it will be at most once
semantics?




bit1129@163.com
 
From: Haopu Wang
Date: 2015-06-19 18:47
To: Enno Shioji; Tathagata Das
CC: prajod.vettiyattil@wipro.com; Cody Koeninger; bit1129@163.com; Jordan Pilat; Will Briggs;
Ashish Soni; ayan guha; user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish
Sasidharan
Subject: RE: RE: Spark or Storm
My question is not directly related: about the "exactly-once semantic", the document (copied
below) said spark streaming gives exactly-once semantic, but actually from my test result,
with check-point enabled, the application always re-process the files in last batch after
gracefully restart.
 
======
Semantics of Received Data
Different input sources provide different guarantees, ranging from at-least once to exactly
once. Read for more details.
With Files
If all of the input data is already present in a fault-tolerant files system like HDFS, Spark
Streaming can always recover from any failure and process all the data. This gives exactly-once
semantics, that all the data will be processed exactly once no matter what fails.
 
 


From: Enno Shioji [mailto:eshioji@gmail.com] 
Sent: Friday, June 19, 2015 5:29 PM
To: Tathagata Das
Cc: prajod.vettiyattil@wipro.com; Cody Koeninger; bit1129@163.com; Jordan Pilat; Will Briggs;
Ashish Soni; ayan guha; user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish
Sasidharan
Subject: Re: RE: Spark or Storm
 
Fair enough, on second thought, just saying that it should be idempotent is indeed more confusing.
 
I guess the crux of the confusion comes from the fact that people tend to assume the work
you described (store batch id and skip etc.) is handled by the framework, perhaps partly because
Storm Trident does handle it (you just need to let Storm know if the output operation has
succeeded or not, and it handles the batch id storing & skipping business). Whenever I
explain people that one needs to do this additional work you described to get end-to-end exactly-once
semantics, it usually takes a while to convince them. In my limited experience, they tend
to interpret "transactional" in that sentence to mean that you just have to write to a transactional
storage like ACID RDB. Pointing them to "Semantics of output operations" is usually sufficient
though.
 
Maybe others like @Ashish can weigh on this; did you interpret it in this way?
 
What if we change the statement into:
"end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or
transactional). To learn how to make your updates idempotent or transactional, see the "Semantics
of output operations" section in this chapter"
 
That way, it's clear that it's not sufficient to merely write to a "transactional storage"
like ACID store.
 
 
 
 
 
 
 
On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das <tdas@databricks.com> wrote:
If the current documentation is confusing, we can definitely improve the documentation. However,
I dont not understand why is the term "transactional" confusing. If your output operation
has to add 5, then the user has to implement the following mechanism
 
1. If the unique id of the batch of data is already present in the store, then skip the update
2. Otherwise atomically do both, the update operation as well as store the unique id of the
batch. This is pretty much the definition of a transaction. The user has to be aware of the
transactional semantics of the data store while implementing this functionality. 
 
You CAN argue that this effective makes the whole updating sort-a idempotent, as even if you
try doing it multiple times, it will update only once. But that is not what is generally considered
as idempotent. Writing a fixed count, not an increment, is usually what is called idempotent.
And so just mentioning that the output operation must be idempotent is, in my opinion, more
confusing.
 
To take a page out of the Storm / Trident guide, even they call this exact conditional updating
of Trident State as "transactional" operation. See "transactional spout" in the Trident State
guide - https://storm.apache.org/documentation/Trident-state
 
In the end, I am totally open the suggestions and PRs on how to make the programming guide
easier to understand. :)
 
TD
 
On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji <eshioji@gmail.com> wrote:
Tbh I find the doc around this a bit confusing. If it says "end-to-end exactly-once semantics
(if your updates to downstream systems are idempotent or transactional)", I think most people
will interpret it that as long as you use a storage which has atomicity (like MySQL/Postgres
etc.), a successful output operation for a given batch (let's say "+ 5") is going to be issued
exactly-once against the storage.
 
However, as I understand it that's not what this statement means. What it is saying is, it
will always issue "+5" and never, say "+6", because it makes sure a message is processed exactly-once
internally. However, it *may* issue "+5" more than once for a given batch, and it is up to
the developer to deal with this by either making the output operation idempotent (e.g. "set
5"), or "transactional" (e.g. keep track of batch IDs and skip already applied batches etc.).
 
I wonder if it makes more sense to drop "or transactional" from the statement, because if
you think about it, ultimately what you are asked to do is to make the writes idempotent even
with the "transactional" approach, & "transactional" is a bit loaded and would be prone
to lead to misunderstandings (even though in fairness, if you read the fault tolerance chapter
it explicitly explains it).
 
 
 
On Fri, Jun 19, 2015 at 2:56 AM, <prajod.vettiyattil@wipro.com> wrote:
More details on the Direct API of Spark 1.3 is at the databricks blog: https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html
 
Note the use of checkpoints to persist the Kafka offsets in Spark Streaming itself, and not
in zookeeper. 
 
Also this statement:”.. This allows one to build a Spark Streaming + Kafka pipelines with
end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or
transactional).”
 
 
From: Cody Koeninger [mailto:cody@koeninger.org] 
Sent: 18 June 2015 19:38
To: bit1129@163.com
Cc: Prajod S Vettiyattil (WT01 - BAS); jrpilat@gmail.com; eshioji@gmail.com; wrbriggs@gmail.com;
asoni.learn@gmail.com; ayan guha; user; sateesh.kavuri@gmail.com; sparkenthusiast@yahoo.in;
sabarish.sasidharan@manthan.com
Subject: Re: RE: Spark or Storm
 
That general description is accurate, but not really a specific issue of the direct steam.
 It applies to anything consuming from kafka (or, as Matei already said, any streaming system
really).  You can't have exactly once semantics, unless you know something more about how
you're storing results.
 
For "some unique id", topicpartition and offset is usually the obvious choice, which is why
it's important that the direct stream gives you access to the offsets.
 
See https://github.com/koeninger/kafka-exactly-once for more info
 
 
 
On Thu, Jun 18, 2015 at 6:47 AM, bit1129@163.com <bit1129@163.com> wrote:
I am wondering how direct stream api ensures end-to-end exactly once semantics
 
I think there are two things involved:
1. From the spark streaming end, the driver will replay the Offset range when it's down and
restarted,which means that the new tasks will process some already processed data.
2. From the user end, since tasks may process already processed data, user end should detect
that some data has already been processed,eg,
use some unique ID.
 
Not sure if I have understood correctly.
 
 


bit1129@163.com
 
From: prajod.vettiyattil@wipro.com
Date: 2015-06-18 16:56
To: jrpilat@gmail.com; eshioji@gmail.com
CC: wrbriggs@gmail.com; asoni.learn@gmail.com; guha.ayan@gmail.com; user@spark.apache.org;
sateesh.kavuri@gmail.com; sparkenthusiast@yahoo.in; sabarish.sasidharan@manthan.com
Subject: RE: Spark or Storm
>>not being able to read from Kafka using multiple nodes
 
> Kafka is plenty capable of doing this..
 
I faced the same issue before Spark 1.3 was released.
 
The issue was not with Kafka, but with Spark Streaming’s Kafka connector. Before Spark 1.3.0
release one Spark worker would get all the streamed messages. We had to re-partition to distribute
the processing.
 
From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka
streamed to Spark workers. See the “Approach 2: Direct Approach” in this page: http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html.
Note that is also mentions zero data loss and exactly once semantics for kafka integration.
 
 
Prajod
 
From: Jordan Pilat [mailto:jrpilat@gmail.com] 
Sent: 18 June 2015 03:57
To: Enno Shioji
Cc: Will Briggs; asoni.learn@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast;
Sabarish Sasidharan
Subject: Re: Spark or Storm
 
>not being able to read from Kafka using multiple nodes
Kafka is plenty capable of doing this,  by clustering together multiple consumer instances
into a consumer group.
If your topic is sufficiently partitioned, the consumer group can consume the topic in a parallelized
fashion.
If it isn't, you still have the fault tolerance associated with clustering the consumers.
OK
JRP
On Jun 17, 2015 1:27 AM, "Enno Shioji" <eshioji@gmail.com> wrote:
We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.
 
Some of the important draw backs are:
Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but
it's far from ideal)
There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but
is not practical if you have any significant amount of state because it does so by dumping
the entire state on every checkpointing)
 
There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout,
not being able to read from Kafka using multiple nodes, data loss hazard with Kafka.
 
It's also not possible to attain very low latency in Spark, if that's what you need.
 
The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare
it with Storm's Java API.
 
I admit I might be a bit biased towards Storm tho as I'm more familiar with it.
 
Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation
and you are reading from Kinesis to begin with, it might be an easier option to just do the
transformation in Kinesis.
 
 
 
 
 
On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan <sabarish.sasidharan@manthan.com>
wrote:
Whatever you write in bolts would be the logic you want to apply on your events. In Spark,
that logic would be coded in map() or similar such  transformations and/or actions. Spark
doesn't enforce a structure for capturing your processing logic like Storm does.
Regards
Sab
Probably overloading the question a bit.
In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality
possible with Spark streaming? During each phase of the data processing, the transformed data
is stored to the database and this transformed data should then be sent to a new pipeline
for further processing
How can this be achieved using Spark?
 
On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast <sparkenthusiast@yahoo.in> wrote:
I have a use-case where a stream of Incoming events have to be aggregated and joined to create
Complex events. The aggregation will have to happen at an interval of 1 minute (or less).
 
The pipeline is :
                                  send events                                          enrich
event
Upstream services -------------------> KAFKA ---------> event Stream Processor ------------>
Complex Event Processor ------------> Elastic Search.
 
From what I understand, Storm will make a very good ESP and Spark Streaming will make a good
CEP.
 
But, we are also evaluating Storm with Trident.
 
How does Spark Streaming compare with Storm with Trident?
 
Sridhar Chellappa
 
 
 
 
 
 
On Wednesday, 17 June 2015 10:02 AM, ayan guha <guha.ayan@gmail.com> wrote:
 
I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity
is 20k per 10 mins. Little manipulation of data will be required but that's regardless of
the tool so we will be writing that piece in Java pojo. 
All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.
TIA.
Best
Ayan
On 17 Jun 2015 12:13, "Will Briggs" <wrbriggs@gmail.com> wrote:
The programming models for the two frameworks are conceptually rather different; I haven't
worked with Storm for quite some time, but based on my old experience with it, I would equate
Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then,
there are significant differences, but it's a bit closer.

If you can share your use case, we might be able to provide better guidance.

Regards,
Will

On June 16, 2015, at 9:46 PM, asoni.learn@gmail.com wrote:

Hi All,

I am evaluating spark VS storm ( spark streaming  ) and i am not able to see what is equivalent
of Bolt in storm inside spark.

Any help will be appreciated on this ?

Thanks ,
Ashish
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
 
 
 
The information contained in this electronic message and any attachments to this message are
intended for the exclusive use of the addressee(s) and may contain proprietary, confidential
or privileged information. If you are not the intended recipient, you should not disseminate,
distribute or copy this e-mail. Please notify the sender immediately and destroy all copies
of this message and any attachments. WARNING: Computer viruses can be transmitted via email.
The recipient should check this email and any attachments for the presence of viruses. The
company accepts no liability for any damage caused by any virus transmitted by this email.
www.wipro.com 
 
The information contained in this electronic message and any attachments to this message are
intended for the exclusive use of the addressee(s) and may contain proprietary, confidential
or privileged information. If you are not the intended recipient, you should not disseminate,
distribute or copy this e-mail. Please notify the sender immediately and destroy all copies
of this message and any attachments. WARNING: Computer viruses can be transmitted via email.
The recipient should check this email and any attachments for the presence of viruses. The
company accepts no liability for any damage caused by any virus transmitted by this email.
www.wipro.com 
 
 
 
Mime
View raw message