spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fridtjof Sander <fridtjof.san...@googlemail.com>
Subject Re: Grouping on bucketed and sorted columns
Date Fri, 02 Sep 2016 12:25:13 GMT
I succeeded to do some experimental evaluation, and it seems I correctly 
understood the code:
A partition that consist of hive-buckets is read bucket-file by 
bucket-file, which leads to the loss of internal sorting.

Does anyone have an opinion about my alternative idea of reading from 
multiple bucket-files simultaneously to keep that ordering?

Regarding the followup questions:

1. I found the `collect_list()`function, which seems provide what I 
want. However, I fail to collect more than one column. Is there a way to 
do basically: .agg(collect_list("*")) ?

2. I worked around that problem by writing and reading the table within 
the same context/session, so that the ephemeral metastore doesn't loose 
it's content. However, in general a hive-metastore seems to be required 
for a production usage, since there is only an ephemeral- and a 
hive-catalog implementation available in 2.0.0.

I would highly appreciate some feedback to my thoughts and questions

Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:
> Hi Spark users,
>
> I'm currently investigating spark's bucketing and partitioning 
> capabilities and I have some questions:
>
> Let /T/ be a table that is bucketed and sorted by /T.id/ and 
> partitioned by /T.date/. Before persisting, /T/ has been repartitioned 
> by /T.id/ to get only one file per bucket.
> I want to group by /T.id/ over a subset of /T.date/'s values.
>
> It seems to me that the best execution plan in this scenario would be 
> the following:
> - Schedule one stage (no exchange) with as many tasks as we have 
> bucket-ids, so that there is a mapping from each task to a bucket-id
> - Each tasks opens all bucket-files belonging to "it's" bucket-id 
> simultaneously, which is one per affected partition /T.date/
> - Since the data inside the buckets are sorted, we can perform the 
> second phase of "two-phase-multiway-merge-sort" to get our groups, 
> which can be "pipelined" into the next operator
>
> From what I understand after scanning through the code, however, it 
> appears to me that each bucket-file is read completely before the 
> record-iterator is advanced to the next bucket file (see FileScanRDD , 
> same applies to Hive). So a groupBy would require to sort the 
> partitions of the resulting RDD before the groups can be emitted, 
> which results in a blocking operation.
>
> Could anyone confirm that I'm assessing the situation correctly here, 
> or correct me if not?
>
> Followup questions:
>
> 1. Is there a way to get the "sql" groups into the RDD API, like the 
> RDD groupBy would return them? I fail to formulate a job like this, 
> because a query with groupBy, that misses an aggregation function, is 
> invalid.
> 2. I haven't simply testet this, because I fail to load a table with 
> the specified properties like above:
> After writing a table like this:
> .write().partitionBy("date").bucketBy(4,"id").sortBy("id").format("json").saveAsTable("table");
> I fail to read it again, with the partitioning and bucketing being 
> recognized.
> Is a functioning Hive-Metastore required for this to work, or is there 
> a workaround?
>
> I hope someone can spare the time to help me out here.
>
> All the best,
> Fridtjof
>


Mime
View raw message