spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andy petrella <andy.petre...@gmail.com>
Subject Re: [brainsotrming] Generalization of DStream, a ContinuousRDD ?
Date Fri, 01 Aug 2014 16:48:11 GMT
Heya,
Dunno if these ideas are still in the air or felt in the warp ^^.
However there is a paper on avocado
<http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project8_report.pdf>
that
mentions a way of working with their data (sequence's reads) in a windowed
manner without neither time nor timestamp field's value, but a kind-of
internal index as range delimiter -- thus defining their own exotic
continuum and break function.

greetz,

 aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

<http://about.me/noootsab>


On Thu, Jul 17, 2014 at 1:11 AM, andy petrella <andy.petrella@gmail.com>
wrote:

> Indeed, these two cases are tightly coupled (the first one is a special
> case of the second).
>
> Actually, these "outliers" could be handled by a dedicated function what I
> named outliersManager -- I was not so much inspired ^^, but we could name
> these outliers, "outlaws" and thus the function would be "sheriff".
> The purpose of this "sheriff" function would be to create yet another
> distributed collection (RDD, CRDD, ...?) with only the --outliers-- outlaws
> in it.
>
> Because these problems have a nature which will be as different as the use
> case will be, it's hard to find a generic way to tackle them. So, you
> know... that's why... I put temporarily them in jail and wait for the judge
> to show them the right path! (.... okay it's late in Belgium -- 1AM).
>
> All in all, it's more or less what we would do in DStream as well actually.
> Let me expand a bit this reasoning, let's assume that some data points can
> come along with the time, but aren't in sync with it -- f.i., a device that
> wakes up and send all it's data at once.
> The DStream will package them into RDDs mixed-up with true current data
> points, however, the logic of the job will have to use a 'Y' road :
> * to integrate them into a database at the right place
> * to simply drop them out because they're won't be part of a shown chart
> * etc
>
> In this case, the 'Y' road would be of the contract ;-), and so left at
> the appreciation of the dev.
>
> Another way, to do it would be to ignore but log them, but that would be
> very crappy, non professional and useful (and of course I'm just kidding).
>
> my0.002¢
>
>
>
>  aℕdy ℙetrella
> about.me/noootsab
> [image: aℕdy ℙetrella on about.me]
>
> <http://about.me/noootsab>
>
>
> On Thu, Jul 17, 2014 at 12:31 AM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> I think it makes sense, though without a concrete implementation its hard
>> to be sure. Applying sorting on the RDD according to the RDDs makes sense,
>> but I can think of two kinds of fundamental problems.
>>
>> 1. How do you deal with ordering across RDD boundaries. Say two
>> consecutive
>> RDDs in the DStream has the following record timestamps    RDD1: [ 1, 2,
>> 3,
>> 4, 6, 7 ]   RDD 2: [ 5, 8, 9, 10] . And you want to run a function through
>> all these records in the timestamp order. I am curious to find how this
>> problem can be solved without sacrificing efficiency (e.g. I can imagine
>> doing multiple pass magic)
>>
>> 2. An even more fundamental question is how do you ensure ordering with
>> delayed records. If you want to process in order of application time, and
>> records are delayed how do you deal with them.
>>
>> Any ideas? ;)
>>
>> TD
>>
>>
>>
>> On Wed, Jul 16, 2014 at 2:37 AM, andy petrella <andy.petrella@gmail.com>
>> wrote:
>>
>> > Heya TD,
>> >
>> > Thanks for the detailed answer! Much appreciated.
>> >
>> > Regarding order among elements within an RDD, you're definitively right,
>> > it'd kill the //ism and would require synchronization which is
>> completely
>> > avoided in distributed env.
>> >
>> > That's why, I won't push this constraint to the RDDs themselves
>> actually,
>> > only the Space is something that *defines* ordered elements, and thus
>> there
>> > are two functions that will break the RDDs based on a given (extensible,
>> > plugable) heuristic f.i.
>> > Since the Space is rather decoupled from the data, thus the source and
>> the
>> > partitions, it's the responsibility of the CRRD implementation to
>> dictate
>> > how (if necessary) the elements should be sorted in the RDDs... which
>> will
>> > require some shuffles :-s -- Or the couple (source, space) is something
>> > intrinsically ordered (like it is for DStream).
>> >
>> > To be more concrete an RDD would be composed of un-ordered iterator of
>> > millions of events for which all timestamps land into the same time
>> > interval.
>> >
>> > WDYT, would that makes sense?
>> >
>> > thanks again for the answer!
>> >
>> > greetz
>> >
>> >  aℕdy ℙetrella
>> > about.me/noootsab
>> > [image: aℕdy ℙetrella on about.me]
>> >
>> > <http://about.me/noootsab>
>> >
>> >
>> > On Wed, Jul 16, 2014 at 12:33 AM, Tathagata Das <
>> > tathagata.das1565@gmail.com
>> > > wrote:
>> >
>> > > Very interesting ideas Andy!
>> > >
>> > > Conceptually i think it makes sense. In fact, it is true that dealing
>> > with
>> > > time series data, windowing over application time, windowing over
>> number
>> > of
>> > > events, are things that DStream does not natively support. The real
>> > > challenge is actually mapping the conceptual windows with the
>> underlying
>> > > RDD model. On aspect you correctly observed in the ordering of events
>> > > within the RDDs of the DStream. Another fundamental aspect is the fact
>> > that
>> > > RDDs as parallel collections, with no well-defined ordering in the
>> > records
>> > > in the RDDs. If you want to process the records in an RDD as a ordered
>> > > stream of events, you kind of have to process the stream sequentially,
>> > > which means you have to process each RDD partition one-by-one, and
>> > > therefore lose the parallelism. So implementing all these
>> functionality
>> > may
>> > > mean adding functionality at the cost of performance. Whether that is
>> > okay
>> > > for Spark Streaming to have these OR this tradeoff is not-intuitive
>> for
>> > > end-users and therefore should not come out-of-the-box with Spark
>> > Streaming
>> > > -- that is a definitely a question worth debating upon.
>> > >
>> > > That said, for some limited usecases, like windowing over N events,
>> can
>> > be
>> > > implemented using custom RDDs like SlidingRDD
>> > > <
>> > >
>> >
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
>> > > >
>> > > without
>> > > losing parallelism. For things like app time based windows, and
>> > > random-application-event based windows, its much harder.
>> > >
>> > > Interesting ideas nonetheless. I am curious to see how far we can push
>> > > using the RDD model underneath, without losing parallelism and
>> > performance.
>> > >
>> > > TD
>> > >
>> > >
>> > >
>> > > On Tue, Jul 15, 2014 at 10:11 AM, andy petrella <
>> andy.petrella@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > Dear Sparkers,
>> > > >
>> > > > *[sorry for the lengthy email... => head to the gist
>> > > > <https://gist.github.com/andypetrella/12228eb24eea6b3e1389>
for a
>> > > preview
>> > > > :-p**]*
>> > > >
>> > > > I would like to share some thinking I had due to a use case I faced.
>> > > > Basically, as the subject announced it, it's a generalization of the
>> > > > DStream currently available in the streaming project.
>> > > > First of all, I'd like to say that it's only a result of some
>> personal
>> > > > thinking, alone in the dark with a use case, the spark code, a
>> sheet of
>> > > > paper and a poor pen.
>> > > >
>> > > >
>> > > > DStream is a very great concept to deal with micro-batching use
>> cases,
>> > > and
>> > > > it does it very well too!
>> > > > Also, it hardly relies on the elapsing time to create its internal
>> > > > micro-batches.
>> > > > However, there are similar use cases where we need micro-batches
>> where
>> > > this
>> > > > constraint on the time doesn't hold, here are two of them:
>> > > > * a micro-batch has to be created every *n* events received
>> > > > * a micro-batch has to be generate based on the values of the items
>> > > pushed
>> > > > by the source (which might even not be a stream!).
>> > > >
>> > > > An example of use case (mine ^^) would be
>> > > > * the creation of timeseries from a cold source containing
>> timestamped
>> > > > events (like S3).
>> > > > * one these timeseries have cells being the mean (sum, count, ...)
>> of
>> > one
>> > > > of the fields of the event
>> > > > * the mean has to be computed over a window depending on a field
>> > > > *timestamp*.
>> > > >
>> > > > * a timeserie is created for each type of event (the number of
>> types is
>> > > > high)
>> > > > So, in this case, it'd be interesting to have an RDD for each cell,
>> > which
>> > > > will generate all cells for all neede timeseries.
>> > > > It's more or less what DStream does, but here it won't help due what
>> > was
>> > > > stated above.
>> > > >
>> > > > That's how I came to a raw sketch of what could be named
>> ContinuousRDD
>> > > > (CRDD) which is basically and RDD[RDD[_]]. And, for the sake of
>> > > simplicity
>> > > > I've stuck with the definition of a DStream to think about it. Okay,
>> > > let's
>> > > > go ^^.
>> > > >
>> > > >
>> > > > Looking at the DStream contract, here is something that could be
>> > drafted
>> > > > around CRDD.
>> > > > A *CRDD* would be a generalized concept that relies on:
>> > > > * a reference space/continuum (to which data can be bound)
>> > > > * a binning function that can breaks the continuum into splits.
>> > > > Since *Space* is a continuum we could define it as:
>> > > > * a *SpacePoint* (the origin)
>> > > > * a SpacePoint=>SpacePoint (the continuous function)
>> > > > * a Ordering[SpacePoint]
>> > > >
>> > > > DStream uses a *JobGenerator* along with a DStreamGraph, which are
>> > using
>> > > > timer and clock to do their work, in the case of a CRDD we'll have
>> to
>> > > > define also a point generator, as a more generic but also adaptable
>> > > > concept.
>> > > >
>> > > >
>> > > > So far (so good?), these definition should work quite fine for
>> > *ordered*
>> > > > space
>> > > > for which:
>> > > > * points are coming/fetched in order
>> > > > * the space is fully filled (no gaps)
>> > > > For these cases, the JobGenerator (f.i.) could be defined with two
>> > extra
>> > > > functions:
>> > > > * one is responsible to chop the batches even if the upper bound of
>> the
>> > > > batch hasn't been seen yet
>> > > > * the other is responsible to handle outliers (and could wrap them
>> into
>> > > yet
>> > > > another CRDD ?)
>> > > >
>> > > >
>> > > > I created a gist here wrapping up the types and thus the skeleton
of
>> > this
>> > > > idea, you can find it here:
>> > > > https://gist.github.com/andypetrella/12228eb24eea6b3e1389
>> > > >
>> > > > WDYT?
>> > > > *The answer can be: you're a fool!*
>> > > > Actually, I already I am, but also I like to know why.... so some
>> > > > explanations will help me :-D.
>> > > >
>> > > > Thanks to read 'till this point.
>> > > >
>> > > > Greetz,
>> > > >
>> > > >
>> > > >
>> > > >  aℕdy ℙetrella
>> > > > about.me/noootsab
>> > > > [image: aℕdy ℙetrella on about.me]
>> > > >
>> > > > <http://about.me/noootsab>
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message