spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neelesh <>
Subject Re: Spark Streaming and message ordering
Date Fri, 20 Feb 2015 17:44:19 GMT
Thanks Jorn. Indeed, we do not need global ordering, since our data is
partitioned well. We do not need ordering based on wallclock time, that
would require waiting indefinitely.  All we need is the execution of
batches (not job submission) to happen in the same order they are
generated, which looks like is not enforced, but more a side effect of how
job submission happens as of now. Cody's suggestions are useful to our
case, though I need to take a closer look how job executions happen within
a stream. Loss of parallelism or failure handling are an issue mainly for
global ordering. Global ordering is a much harder problem and relevant only
for a small set of use cases, in my opinion. Data is almost always
partitioned in some way and any specific ordering behavior is typically
constrained within a partition in general.

So for us - loss of events is unacceptable, events must be executed
in-order within a partition (strictly speaking, 1-1 mapping with kafka
partitions) , and our execution logic is idempotent. All of these seem to
be possible with 1.3, with some minor tweaks


On Fri, Feb 20, 2015 at 9:24 AM, Jörn Franke <> wrote:

> You may think as well if your use case really needs a very strict order,
> because configuring spark that it supports such a strict order means
> rendering most of benefits useless (failure handling,  parallelism etc.).
> Usually, in a distributed setting you can order events, but this also means
> that you may need to wait for an unlimited time to be sure that you receive
> all events to order them. This is impractical, so people implements time
> outs, which may lead to the case that you loose events etc.
> The optimal thing would be to partition the data and that there needs to
> be an order within the partition (across is a different story...).
> All in all implementing order in Spark depends on your requirements for
> ordering and depending on this it can  be easy or very difficult. You may
> also consider writing your own framework for mesos or yarn to better meet
> the requirements and keep your spark cluster config clean (what happens if
> there are spark jobs not requiring an order? They would be slowed down....)
> So you need to think about: by which criteria can I order events, do I
> accept loss of events?, do I need a global order over all events or is it
> only relevant for subsets (partions), what is the impact of not ordering?,
> what is the impact of loss of events,...
> Le 20 févr. 2015 18:01, "Cody Koeninger" <> a écrit :
> There is typically some slack between when a batch finishes executing and
>> when the next batch is scheduled.  You should be able to arrange your batch
>> sizes / cluster resources to ensure that.  If there isn't slack, your
>> overall delay is going to keep increasing indefinitely.
>> If you're inserting into mysql, you're probably going to be much better
>> off doing bulk inserts anyway, and transaction ordering is going to stop a
>> lot of overlap that might otherwise happen.  In pseudocode:
>> stream.foreachRdd { rdd =>
>>   rdd.foreachPartition { iter =>
>>      bulk = iter.filter(matchEvent).toList
>>      transaction { insert bulk }
>>   }
>> }
>> You may already know this, but getting jdbc to do true bulk inserts to
>> mysql requires a bit of hoop jumping, so turn on query logging during
>> development to make sure you aren't getting individual inserts.
>> Also be aware that output actions aren't guaranteed to happen exactly
>> once, so you'll need to store unique offset ids in mysql or otherwise deal
>> with the possibility of executor failures.
>> On Fri, Feb 20, 2015 at 10:39 AM, Neelesh <> wrote:
>>> Thanks for the detailed response Cody. Our use case is to  do some
>>> external lookups (cached and all) for every event, match the event against
>>> the looked up data, decide whether to write an entry in mysql and write it
>>> in the order in which the events arrived within a kafka partition.
>>> We don't need global ordering. Message ordering within a batch can be
>>> achieved either by waiting for 1.3 to be released (the behavior you
>>> described works very well for us, within a batch) , or by using
>>>  updateStateByKey and sorting.   speculative execution is turned off as
>>> well (I think its off by default).
>>> But, from what I see from the JobScheduler/JobGenerator is this. Within
>>> each stream, jobs are generated every 'n' milliseconds (batch duration),
>>> and submitted for execution. Since job generation in a stream is temporal,
>>> its guaranteed that the jobs are submitted in the order of event arrival
>>> within a stream. And since we have one stream per kafka partition, this
>>> translates to sequentially generated batches & sequentially scheduled
>>> batches within a kafka partition. But since the *execution* of jobs
>>> itself is in parallel, its probable that back-to-back batches in a stream
>>> are submitted one after the other , but are executing concurrently. If this
>>> understanding of mine is correct, it breaks our requirement that messages
>>> be executed in order within a partition.
>>> Thanks!
>>> On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger <>
>>> wrote:
>>>> For a given batch, for a given partition, the messages will be
>>>> processed in order by the executor that is running that partition.  That's
>>>> because messages for the given offset range are pulled by the executor, not
>>>> pushed from some other receiver.
>>>> If you have speculative execution, yes, another executor may be running
>>>> that partition.
>>>> If your job is lagging behind in processing such that the next batch
>>>> starts executing before the last batch is finished processing, yes it is
>>>> possible for some other executor to start working on messages from that
>>>> same kafka partition.
>>>> The obvious solution here seems to be turn off speculative execution
>>>> and adjust your batch interval / sizes such that they can comfortably
>>>> finish processing :)
>>>> If your processing time is sufficiently non-linear with regard to the
>>>> number of messages, yes you might be able to do something with overriding
>>>> dstream.compute.  Unfortunately the new kafka dstream implementation is
>>>> private, so it's not straightforward to subclass it.  I'd like to get a
>>>> solution in place for people who need to be able to tune the batch
>>>> generation policy (I need to as well, for unrelated reasons).  Maybe you
>>>> can say a little more about your use case.
>>>> But regardless of the technology you're using to read from kafka
>>>> (spark, storm, whatever), kafka only gives you ordering as to a particular
>>>> partition.  So you're going to need to do some kind of downstream sorting
>>>> if you really care about a global order.
>>>> On Fri, Feb 20, 2015 at 1:43 AM, Neelesh <> wrote:
>>>>> Even with the new direct streams in 1.3,  isn't it the case that the
>>>>> job *scheduling* follows the partition order, rather than job
>>>>> *execution*? Or is it the case that the stream listens to job
>>>>> completion event (using a streamlistener) before scheduling the next
>>>>> batch?  To compare with storm from a message ordering point of view,
>>>>> a tuple is fully processed by the DAG (as defined by spout+bolts), the
>>>>> tuple does not enter the DAG.
>>>>> On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger <>
>>>>> wrote:
>>>>>> Kafka ordering is guaranteed on a per-partition basis.
>>>>>> The high-level consumer api as used by the spark kafka streams prior
>>>>>> to 1.3 will consume from multiple kafka partitions, thus not giving
>>>>>> ordering guarantees.
>>>>>> The experimental direct stream in 1.3 uses the "simple" consumer
>>>>>> and there is a 1:1 correspondence between spark partitions and kafka
>>>>>> partitions.  So you will get deterministic ordering, but only on
>>>>>> per-partition basis.
>>>>>> On Thu, Feb 19, 2015 at 11:31 PM, Neelesh <>
>>>>>>> I had a chance to talk to TD today at the Strata+Hadoop Conf
in San
>>>>>>> Jose. We talked a bit about this after his presentation about
this - the
>>>>>>> short answer is spark streaming does not guarantee any sort of
>>>>>>> (within batches, across batches).  One would have to use updateStateByKey
>>>>>>> to collect the events and sort them based on some attribute of
the event.
>>>>>>> But TD said message ordering is a frequently asked feature recently
and is
>>>>>>> getting on his radar.
>>>>>>> I went through the source code and there does not seem to be
>>>>>>> architectural/design limitation to support this.  (JobScheduler,
>>>>>>> JobGenerator are a good starting point to see how stuff works
under the
>>>>>>> hood).  Overriding DStream#compute and using streaminglistener
looks like a
>>>>>>> simple way of ensuring ordered execution of batches within a
stream. But
>>>>>>> this would be a partial solution, since ordering within a batch
needs some
>>>>>>> more work that I don't understand fully yet.
>>>>>>> Side note :  My custom receiver polls the metricsservlet once
in a
>>>>>>> while to decide whether jobs are getting done fast enough and
>>>>>>> throttle/relax pushing data in to receivers based on the numbers
>>>>>>> by metricsservlet. I had to do this because out-of-the-box rate
>>>>>>> right now is static and cannot adapt to the state of the cluster
>>>>>>> thnx
>>>>>>> -neelesh
>>>>>>> On Wed, Feb 18, 2015 at 4:13 PM, jay vyas <
>>>>>>>> wrote:
>>>>>>>> This is a *fantastic* question.  The idea of how we identify
>>>>>>>> individual things in multiple  DStreams is worth looking
>>>>>>>> The reason being, that you can then fine tune your streaming
>>>>>>>> based on the RDD identifiers (i.e. are the timestamps from
the producer
>>>>>>>> correlating closely to the order in which RDD elements are
being produced)
>>>>>>>> ?  If *NO* then you need to (1) dial up throughput on producer
sources or
>>>>>>>> else (2) increase cluster size so that spark is capable of
evenly handling
>>>>>>>> load.
>>>>>>>> You cant decide to do (1) or (2) unless you can track  when
>>>>>>>> streaming elements are being  converted to RDDs by spark
>>>>>>>> On Wed, Feb 18, 2015 at 6:54 PM, Neelesh <>
>>>>>>>> wrote:
>>>>>>>>> There does not seem to be a definitive answer on this.
Every time
>>>>>>>>> I google for message ordering,the only relevant thing
that comes up is this
>>>>>>>>>  -
>>>>>>>>> .
>>>>>>>>> With a kafka receiver that pulls data from a single kafka
>>>>>>>>> partition of a kafka topic, are individual messages in
the microbatch in
>>>>>>>>> same the order as kafka partition? Are successive microbatches
>>>>>>>>> from a kafka partition executed in order?
>>>>>>>>> Thanks!
>>>>>>>> --
>>>>>>>> jay vyas

View raw message