spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Sorting on a streaming dataframe
Date Mon, 30 Apr 2018 23:04:05 GMT
Please open a JIRA then!

On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9379@gmail.com>
wrote:

> I see.
>
> monotonically_increasing_id on streaming dataFrames will be really helpful
> to me and I believe to many more users. Adding this functionality in Spark
> would be efficient in terms of performance as compared to implementing this
> functionality inside the applications.
>
> Hemant
>
> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <michael@databricks.com
> > wrote:
>
>> The basic tenet of structured streaming is that a query should return the
>> same answer in streaming or batch mode. We support sorting in complete mode
>> because we have all the data and can sort it correctly and return the full
>> answer.  In update or append mode, sorting would only return a correct
>> answer if we could promise that records that sort lower are going to arrive
>> later (and we can't).  Therefore, it is disallowed.
>>
>> If you are just looking for a unique, stable id and you are already using
>> kafka as the source, you could just combine the partition id and the
>> offset. The structured streaming connector to Kafka
>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>> exposes both of these in the schema of the streaming DataFrame. (similarly
>> for kinesis you can use the shard id and sequence number)
>>
>> If you need the IDs to be contiguous, then this is a somewhat
>> fundamentally hard problem.  I think the best we could do is add support
>> for monotonically_increasing_id() in streaming dataframes.
>>
>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chayapan@gmail.com>
>> wrote:
>>
>>> Perhaps your use case fits to Apache Kafka better.
>>>
>>> More info at:
>>> https://kafka.apache.org/documentation/streams/
>>>
>>> Everything really comes down to the architecture design and algorithm
>>> spec. However, from my experience with Spark, there are many good reasons
>>> why this requirement is not supported ;)
>>>
>>> Best,
>>>
>>> Chayapan (A)
>>>
>>>
>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9379@gmail.com>
>>> wrote:
>>>
>>> Thanks Chris. There are many ways in which I can solve this problem but
>>> they are cumbersome. The easiest way would have been to sort the streaming
>>> dataframe. The reason I asked this question is because I could not find a
>>> reason why sorting on streaming dataframe is disallowed.
>>>
>>> Hemant
>>>
>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>> chris.bowden@microfocus.com> wrote:
>>>
>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of
>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>> and requires less working knowledge to make effective reuse of internals.
>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>> can be stateful per group. Sometimes this problem may not need to be solved
>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>> and message offsets may be “good enough”.
>>>> ------------------------------
>>>> *From:* Hemant Bhanawat <hemant9379@gmail.com>
>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>> *To:* Reynold Xin
>>>> *Cc:* dev
>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>
>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>>> records in the streaming dataframe gets counters in random order but the
>>>> counter should always be incrementing.
>>>>
>>>> This is working fine until we have a failure. When we have a failure,
>>>> we re-assign the records to snapshot ids  and this time same snapshot id
>>>> can get assigned to a different record. This is a problem because the
>>>> primary key in our storage engine is <recordid, snapshotid>. So we
want to
>>>> sort the dataframe so that the records always get the same snapshot id.
>>>>
>>>>
>>>>
>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rxin@databricks.com>
>>>> wrote:
>>>>
>>>> Can you describe your use case more?
>>>>
>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9379@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>> Why is sorting on streaming dataframes not supported(unless it is
>>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>>
>>>> Hemant
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message