spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chayapan Khannabha <>
Subject Re: Sorting on a streaming dataframe
Date Tue, 24 Apr 2018 20:38:51 GMT
Perhaps your use case fits to Apache Kafka better.

More info at: <>

Everything really comes down to the architecture design and algorithm spec. However, from
my experience with Spark, there are many good reasons why this requirement is not supported


Chayapan (A)

> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <> wrote:
> Thanks Chris. There are many ways in which I can solve this problem but they are cumbersome.
The easiest way would have been to sort the streaming dataframe. The reason I asked this question
is because I could not find a reason why sorting on streaming dataframe is disallowed. 
> Hemant
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris < <>>
> You can happily sort the underlying RDD of InternalRow(s) inside a sink, assuming you
are willing to implement and maintain your own sink(s). That is, just grabbing the parquet
sink, etc. isn’t going to work out of the box. Alternatively map/flatMapGroupsWithState
is probably sufficient and requires less working knowledge to make effective reuse of internals.
Just group by foo and then sort accordingly and assign ids. The id counter can be stateful
per group. Sometimes this problem may not need to be solved at all. For example, if you are
using kafka, a proper partitioning scheme and message offsets may be “good enough”. 
> From: Hemant Bhanawat < <>>
> Sent: Thursday, April 12, 2018 11:42:59 PM
> To: Reynold Xin
> Cc: dev
> Subject: Re: Sorting on a streaming dataframe
> Well, we want to assign snapshot ids (incrementing counters) to the incoming records.
For that, we are zipping the streaming rdds with that counter using a modified version of
ZippedWithIndexRDD. We are ok if the records in the streaming dataframe gets counters in random
order but the counter should always be incrementing. 
> This is working fine until we have a failure. When we have a failure, we re-assign the
records to snapshot ids  and this time same snapshot id can get assigned to a different record.
This is a problem because the primary key in our storage engine is <recordid, snapshotid>.
So we want to sort the dataframe so that the records always get the same snapshot id. 
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin < <>>
> Can you describe your use case more?
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat < <>>
> Hi Guys, 
> Why is sorting on streaming dataframes not supported(unless it is complete mode)? My
downstream needs me to sort the streaming dataframe.
> Hemant 

View raw message