I succeeded to do some experimental evaluation, and it seems I correctly understood the code:
A partition that consist of hive-buckets is read bucket-file by bucket-file, which leads to the loss of internal sorting.

Does anyone have an opinion about my alternative idea of reading from multiple bucket-files simultaneously to keep that ordering?

Regarding the followup questions:

1. I found the `collect_list()`function, which seems provide what I want. However, I fail to collect more than one column. Is there a way to do basically: .agg(collect_list("*")) ?

2. I worked around that problem by writing and reading the table within the same context/session, so that the ephemeral metastore doesn't loose it's content. However, in general a hive-metastore seems to be required for a production usage, since there is only an ephemeral- and a hive-catalog implementation available in 2.0.0.

I would highly appreciate some feedback to my thoughts and questions

Am 31.08.2016 um 14:45 schrieb Fridtjof Sander:
Hi Spark users,

I'm currently investigating spark's bucketing and partitioning capabilities and I have some questions:

Let  T be a table that is bucketed and sorted by T.id and partitioned by T.date. Before persisting, T has been repartitioned by T.id to get only one file per bucket.
I want to group by T.id over a subset of T.date's values.

It seems to me that the best execution plan in this scenario would be the following:
- Schedule one stage (no exchange) with as many tasks as we have bucket-ids, so that there is a mapping from each task to a bucket-id
- Each tasks opens all bucket-files belonging to "it's" bucket-id simultaneously, which is one per affected partition T.date
- Since the data inside the buckets are sorted, we can perform the second phase of "two-phase-multiway-merge-sort" to get our groups, which can be "pipelined" into the next operator

From what I understand after scanning through the code, however, it appears to me that each bucket-file is read completely before the record-iterator is advanced to the next bucket file (see FileScanRDD , same applies to Hive). So a groupBy would require to sort the partitions of the resulting RDD before the groups can be emitted, which results in a blocking operation.

Could anyone confirm that I'm assessing the situation correctly here, or correct me if not?

Followup questions:

1. Is there a way to get the "sql" groups into the RDD API, like the RDD groupBy would return them? I fail to formulate a job like this, because a query with groupBy, that misses an aggregation function, is invalid.
2. I haven't simply testet this, because I fail to load a table with the specified properties like above:
After writing a table like this:
.write().partitionBy("date").bucketBy(4, "id").sortBy("id").format("json").saveAsTable("table");
I fail to read it again, with the partitioning and bucketing being recognized.
Is a functioning Hive-Metastore required for this to work, or is there a workaround?

I hope someone can spare the time to help me out here.

All the best,