spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Woody <patrick.woo...@gmail.com>
Subject Re: DataSourceV2 write input requirements
Date Wed, 28 Mar 2018 14:26:41 GMT
How would Spark determine whether or not to apply a recommendation - a cost
threshold? And yes, it would be good to flesh out what information we get
from Spark in the datasource when providing these
recommendations/requirements - I could see statistics and the existing
outputPartitioning/Ordering of the child plan being used for providing the
requirement.

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <russell.spitzer@gmail.com>
wrote:

> Thanks for the clarification, definitely would want to require Sort but
> only recommend partitioning ...  I think that would be useful to request
> based on details about the incoming dataset.
>
> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rblue@netflix.com> wrote:
>
>> A required clustering would not, but a required sort would. Clustering is
>> asking for the input dataframe's partitioning, and sorting would be how
>> each partition is sorted.
>>
>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>> I forgot since it's been a while, but does Clustering support allow
>>> requesting that partitions contain elements in order as well? That would be
>>> a useful trick for me. IE
>>> Request/Require(SortedOn(Col1))
>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>
>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rblue@netflix.com.invalid>
>>> wrote:
>>>
>>>> Thanks, it makes sense that the existing interface is for aggregation
>>>> and not joins. Why are there requirements for the number of partitions that
>>>> are returned then?
>>>>
>>>> Does it makes sense to design the write-side `Requirement` classes and
>>>> the read-side reporting separately?
>>>>
>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0fan@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>>>> expose hash function, so Join can't benefit from this interface, as Join
>>>>> doesn't require a general ClusteredDistribution, but a more specific
one
>>>>> called HashClusteredDistribution.
>>>>>
>>>>> So currently only Aggregate can benefit from
>>>>> SupportsReportPartitioning and save shuffle. We can add a new interface
to
>>>>> expose the hash function to make it work for Join.
>>>>>
>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rblue@netflix.com>
wrote:
>>>>>
>>>>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>>>>> that it will work for real use cases. It doesn't specify, as far
as I can
>>>>>> tell, a hash function for combining clusters into tasks or a way
to provide
>>>>>> Spark a hash function for the other side of a join. It seems unlikely
to me
>>>>>> that many data sources would have partitioning that happens to match
the
>>>>>> other side of a join. And, it looks like task order matters? Maybe
I'm
>>>>>> missing something?
>>>>>>
>>>>>> I think that we should design the write side independently based
on
>>>>>> what data stores actually need, and take a look at the read side
based on
>>>>>> what data stores can actually provide. Wenchen, was there a design
doc for
>>>>>> partitioning on the read path?
>>>>>>
>>>>>> I completely agree with your point about a global sort. We recommend
>>>>>> to all of our data engineers to add a sort to most tables because
it
>>>>>> introduces the range partitioner and does a skew calculation, in
addition
>>>>>> to making data filtering much better when it is read. It's really
common
>>>>>> for tables to be skewed by partition values.
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>>> patrick.woody1@gmail.com> wrote:
>>>>>>
>>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>>
>>>>>>> Thanks for the quick replies.
>>>>>>>
>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have
to
>>>>>>> ensure something similar to requiredChildDistribution in SparkPlan
where we
>>>>>>> have the number of partitions as well if we'd want to further
report to
>>>>>>> SupportsReportPartitioning, yeah?
>>>>>>>
>>>>>>> Specifying an explicit global sort can also be useful for filtering
>>>>>>> purposes on Parquet row group stats if we have a time based/high
>>>>>>> cardinality ID field. If my datasource or catalog knows about
previous
>>>>>>> queries on a table, it could be really useful to recommend more
appropriate
>>>>>>> formatting for consumers on the next materialization. The same
would be
>>>>>>> true of clustering on commonly joined fields.
>>>>>>>
>>>>>>> Thanks again
>>>>>>> Pat
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhihong@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>>
>>>>>>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/
>>>>>>>> reader/SupportsReportPartitioning.java :
>>>>>>>>
>>>>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning.
>>>>>>>> Partitioning;
>>>>>>>> ...
>>>>>>>>   Partitioning outputPartitioning();
>>>>>>>>
>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0fan@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Actually clustering is already supported, please take
a look at
>>>>>>>>> SupportsReportPartitioning
>>>>>>>>>
>>>>>>>>> Ordering is not proposed yet, might be similar to what
Ryan
>>>>>>>>> proposed.
>>>>>>>>>
>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhihong@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Interesting.
>>>>>>>>>>
>>>>>>>>>> Should requiredClustering return a Set of Expression's
?
>>>>>>>>>> This way, we can determine the order of Expression's
by looking
>>>>>>>>>> at what requiredOrdering() returns.
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>>> rblue@netflix.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Pat,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for starting the discussion on this, we’re
really
>>>>>>>>>>> interested in it as well. I don’t think there
is a proposed API yet, but I
>>>>>>>>>>> was thinking something like this:
>>>>>>>>>>>
>>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> The reason why RequiresClustering should provide
Expression is
>>>>>>>>>>> that it needs to be able to customize the implementation.
For example,
>>>>>>>>>>> writing to HTable would require building a key
(or the data for a key) and
>>>>>>>>>>> that might use a hash function that differs from
Spark’s built-ins.
>>>>>>>>>>> RequiresSort is fairly straightforward, but the
interaction
>>>>>>>>>>> between the two requirements deserves some consideration.
To make the two
>>>>>>>>>>> compatible, I think that RequiresSort must be
interpreted as a
>>>>>>>>>>> sort within each partition of the clustering,
but could possibly be used
>>>>>>>>>>> for a global sort when the two overlap.
>>>>>>>>>>>
>>>>>>>>>>> For example, if I have a table partitioned by
“day” and
>>>>>>>>>>> “category” then the RequiredClustering would
be by day, category.
>>>>>>>>>>> A required sort might be day ASC, category DESC,
name ASC.
>>>>>>>>>>> Because that sort satisfies the required clustering,
it could be used for a
>>>>>>>>>>> global ordering. But, is that useful? How would
the global ordering matter
>>>>>>>>>>> beyond a sort within each partition, i.e., how
would the partition’s place
>>>>>>>>>>> in the global ordering be passed?
>>>>>>>>>>>
>>>>>>>>>>> To your other questions, you might want to have
a look at the
>>>>>>>>>>> recent SPIP I’m working on to consolidate and
clean up logical
>>>>>>>>>>> plans
>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>>> That proposes more specific uses for the DataSourceV2
API that should help
>>>>>>>>>>> clarify what validation needs to take place.
As for custom catalyst rules,
>>>>>>>>>>> I’d like to hear about the use cases to see
if we can build it into these
>>>>>>>>>>> improvements.
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>> ​
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody
<
>>>>>>>>>>> patrick.woody1@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>
>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2
writes
>>>>>>>>>>>> that we might have the data source inform
Spark of requirements for the
>>>>>>>>>>>> input data's ordering and partitioning. Has
there been a proposed API for
>>>>>>>>>>>> that yet?
>>>>>>>>>>>>
>>>>>>>>>>>> Even one level up it would be helpful to
understand how I
>>>>>>>>>>>> should be thinking about the responsibility
of the data source writer, when
>>>>>>>>>>>> I should be inserting a custom catalyst rule,
and how I should handle
>>>>>>>>>>>> validation/assumptions of the table before
attempting the write.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Pat
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Mime
View raw message