spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: DataSourceV2 write input requirements
Date Fri, 06 Apr 2018 16:42:56 GMT
Since it sounds like there is consensus here, I've opened an issue for
this: https://issues.apache.org/jira/browse/SPARK-23889

On Sun, Apr 1, 2018 at 9:32 AM, Patrick Woody <patrick.woody1@gmail.com>
wrote:

> Yep, that sounds reasonable to me!
>
> On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> +1
>>
>> -------- Original message --------
>> From: Ryan Blue <rblue@netflix.com>
>> Date: 3/30/18 2:28 PM (GMT-08:00)
>> To: Patrick Woody <patrick.woody1@gmail.com>
>> Cc: Russell Spitzer <russell.spitzer@gmail.com>, Wenchen Fan <
>> cloud0fan@gmail.com>, Ted Yu <yuzhihong@gmail.com>, Spark Dev List <
>> dev@spark.apache.org>
>> Subject: Re: DataSourceV2 write input requirements
>>
>> You're right. A global sort would change the clustering if it had more
>> fields than the clustering.
>>
>> Then what about this: if there is no RequiredClustering, then the sort is
>> a global sort. If RequiredClustering is present, then the clustering is
>> applied and the sort is a partition-level sort.
>>
>> That rule would mean that within a partition you always get the sort, but
>> an explicit clustering overrides the partitioning a sort might try to
>> introduce. Does that sound reasonable?
>>
>> rb
>>
>> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woody1@gmail.com
>> > wrote:
>>
>>> Does that methodology work in this specific case? The ordering must be a
>>> subset of the clustering to guarantee they exist in the same partition when
>>> doing a global sort I thought. Though I get the gist that if it does
>>> satisfy, then there is no reason to not choose the global sort.
>>>
>>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rblue@netflix.com> wrote:
>>>
>>>> > Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort?
>>>>
>>>> The idea was to basically assume that if the clustering can be
>>>> satisfied by a global sort, then do the global sort. For example, if the
>>>> clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a
>>>> global sort by columns a, b, and c.
>>>>
>>>> Technically, you could do this with a hash partitioner instead of a
>>>> range partitioner and sort within each partition, but that doesn't make
>>>> much sense because the partitioning would ensure that each partition has
>>>> just one combination of the required clustering columns. Using a hash
>>>> partitioner would make it so that the in-partition sort basically ignores
>>>> the first few values, so it must be that the intent was a global sort.
>>>>
>>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <
>>>> patrick.woody1@gmail.com> wrote:
>>>>
>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change
in that
>>>>>> case, you would still have a clustering and an ordering, but the
ordering
>>>>>> would need to include all fields of the clustering. A way to pass
in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>
>>>>>
>>>>> Can you expand on how the ordering containing the clustering
>>>>> expressions would ensure the global sort? Having an RangePartitioning
would
>>>>> certainly satisfy, but it isn't required - is the suggestion that if
Spark
>>>>> sees this overlap, then it plans a global sort?
>>>>>
>>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>>> russell.spitzer@gmail.com> wrote:
>>>>>
>>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue
to
>>>>>> get more detailed statistics. Like on read we could be using sketch
data
>>>>>> structures to get estimates on unique values and density for each
column.
>>>>>> You may be right that the real way for this to be handled would be
giving a
>>>>>> "cost" back to a higher order optimizer which can decide which method
to
>>>>>> use rather than having the data source itself do it. This is probably
in a
>>>>>> far future version of the api.
>>>>>>
>>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rblue@netflix.com>
wrote:
>>>>>>
>>>>>>> Cassandra can insert records with the same partition-key faster
if
>>>>>>> they arrive in the same payload. But this is only beneficial
if the
>>>>>>> incoming dataset has multiple entries for the same partition
key.
>>>>>>>
>>>>>>> Thanks for the example, the recommended partitioning use case
makes
>>>>>>> more sense now. I think we could have two interfaces, a
>>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>>
>>>>>>>    - Do we want to optimize the low cardinality case? Shuffles
are
>>>>>>>    usually much cheaper at smaller sizes, so I’m not sure it
is necessary to
>>>>>>>    optimize this away.
>>>>>>>    - How do we know there isn’t just a few partition keys for
all
>>>>>>>    the records? It may look like a shuffle wouldn’t help, but
we don’t know
>>>>>>>    the partition keys until it is too late.
>>>>>>>
>>>>>>> Then there’s also the logic for avoiding the shuffle and how
to
>>>>>>> calculate the cost, which sounds like something that needs some
details
>>>>>>> from CBO.
>>>>>>>
>>>>>>> I would assume that given the estimated data size from Spark
and
>>>>>>> options passed in from the user, the data source could make a
more
>>>>>>> intelligent requirement on the write format than Spark independently.
>>>>>>>
>>>>>>> This is a good point.
>>>>>>>
>>>>>>> What would an implementation actually do here and how would
>>>>>>> information be passed? For my use cases, the store would produce
the number
>>>>>>> of tasks based on the estimated incoming rows, because the source
has the
>>>>>>> best idea of how the rows will compress. But, that’s just applying
a
>>>>>>> multiplier most of the time. To be very useful, this would have
to handle
>>>>>>> skew in the rows (think row with a type where total size depends
on type)
>>>>>>> and that’s a bit harder. I think maybe an interface that can
provide
>>>>>>> relative cost estimates based on partition keys would be helpful,
but then
>>>>>>> keep the planning logic in Spark.
>>>>>>>
>>>>>>> This is probably something that we could add later as we find
use
>>>>>>> cases that require it?
>>>>>>>
>>>>>>> I wouldn’t assume that a data source requiring a certain write
>>>>>>> format would give any guarantees around reading the same data?
In the cases
>>>>>>> where it is a complete overwrite it would, but for independent
writes it
>>>>>>> could still be useful for statistics or compression.
>>>>>>>
>>>>>>> Right, you could use this to store a global ordering if there
is
>>>>>>> only one write (e.g., CTAS). I don’t think anything needs to
change in that
>>>>>>> case, you would still have a clustering and an ordering, but
the ordering
>>>>>>> would need to include all fields of the clustering. A way to
pass in the
>>>>>>> partition ordinal for the source to store would be required.
>>>>>>>
>>>>>>> For the second point that ordering is useful for statistics and
>>>>>>> compression, I completely agree. Our best practices doc tells
users to
>>>>>>> always add a global sort when writing because you get the benefit
of a
>>>>>>> range partitioner to handle skew, plus the stats and compression
you’re
>>>>>>> talking about to optimize for reads. I think the proposed API
can request a
>>>>>>> global ordering from Spark already. My only point is that there
isn’t much
>>>>>>> the source can do to guarantee ordering for reads when there
is more than
>>>>>>> one write.
>>>>>>> ​
>>>>>>>
>>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <
>>>>>>> patrick.woody1@gmail.com> wrote:
>>>>>>>
>>>>>>>> Spark would always apply the required clustering and sort
order
>>>>>>>>> because they are required by the data source. It is reasonable
for a source
>>>>>>>>> to reject data that isn’t properly prepared. For example,
data must be
>>>>>>>>> written to HTable files with keys in order or else the
files are invalid.
>>>>>>>>> Sorting should not be implemented in the sources themselves
because Spark
>>>>>>>>> handles concerns like spilling to disk. Spark must prepare
data correctly,
>>>>>>>>> which is why the interfaces start with “Requires”.
>>>>>>>>
>>>>>>>>
>>>>>>>> This was in reference to Russell's suggestion that the data
source
>>>>>>>> could have a required sort, but only a recommended partitioning.
I don't
>>>>>>>> have an immediate recommending use case that would come to
mind though. I'm
>>>>>>>> definitely in sync that the data source itself shouldn't
do work outside of
>>>>>>>> the writes themselves.
>>>>>>>>
>>>>>>>> Considering the second use case you mentioned first, I don’t
think
>>>>>>>>> it is a good idea for a table to put requirements on
the number of tasks
>>>>>>>>> used for a write. The parallelism should be set appropriately
for the data
>>>>>>>>> volume, which is for Spark or the user to determine.
A minimum or maximum
>>>>>>>>> number of tasks could cause bad behavior.
>>>>>>>>
>>>>>>>>
>>>>>>>> For your first use case, an explicit global ordering, the
problem
>>>>>>>>> is that there can’t be an explicit global ordering
for a table when it is
>>>>>>>>> populated by a series of independent writes. Each write
could have a global
>>>>>>>>> order, but once those files are written, you have to
deal with multiple
>>>>>>>>> sorted data sets. I think it makes sense to focus on
order within data
>>>>>>>>> files, not order between data files.
>>>>>>>>
>>>>>>>>
>>>>>>>> This is where I'm interested in learning about the separation
of
>>>>>>>> responsibilities for the data source and how "smart" it is
supposed to be.
>>>>>>>>
>>>>>>>> For the first part, I would assume that given the estimated
data
>>>>>>>> size from Spark and options passed in from the user, the
data source could
>>>>>>>> make a more intelligent requirement on the write format than
Spark
>>>>>>>> independently. Somewhat analogous to how the current FileSource
does bin
>>>>>>>> packing of small files on the read side, restricting parallelism
for the
>>>>>>>> sake of overhead.
>>>>>>>>
>>>>>>>> For the second, I wouldn't assume that a data source requiring
a
>>>>>>>> certain write format would give any guarantees around reading
the same
>>>>>>>> data? In the cases where it is a complete overwrite it would,
but for
>>>>>>>> independent writes it could still be useful for statistics
or compression.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Pat
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rblue@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message