[Spark SQL Discuss] Better support for Partitioning and Bucketing when used together
We use partitioned + bucketed datasets for use-cases where we can afford to take a perf hit at write time, so that reads are optimised. But I feel Spark could more optimally exploit the data layout in query planning. Here I describe why this is a problem, and how it could be improved.
Why is Partitioning + Bucketing required (together)?
There are a class of common problems that can't be solved by:
Pure partitioning - We want to avoid shuffle on some commonly joined datasets, on the same few join keys. This can't be solved by pure partitioning.
Pure Bucketing - For most DataSources (on Spark and other processing frameworks), the folder is the least granular level of identifying datasets. The HiveMetastore lets us collect arbitrary folder partitions into a logical view, and this helps in incremental ingestion and lends itself to a simple form of MVCC.
On Spark, when you try to both Partition and Bucket a dataset, the format on disk and in the metastore is correctly recorded. But this information isn't optimally used for query planning because:
A partitioned+bucketed dataset is read into num_buckets input RDD partitions due to createBucketedRDD. For large datasets with a lot of partitions, this DataFrame is now unusable because of the severely limited parallelism. We can't have large num_buckets, as it would lead to small file problems, especially in skewed partitions.
We could manually turn bucketing off with the spark.sql.sources.bucketing.enabled flag, but we would be losing the natural distribution that's present in the dataset, and lose out on shuffle elimination.
What could happen ideally:
Partitioned + Bucketed data actually have a well defined distribution. It does not fit the currently defined HashClusteredDistribution, but a new one can be defined which takes into account both the value based distribution of partition values and the hash based distribution of bucketing columns.
Queries involving partition columns AND bucketing columns should make use of this data distribution.
E.g: Suppose you have PartitioningCols(a,b) and BucketingCols(c), the joins we can support without shuffle would be on keys (a,b,c), (a,c)[Coalesce values of b into a] and (a)[Partition-Partition join].
Number of input RDD partitions should be decided at the last possible stage of query planning. If no join (as described above) can utilize bucketed data, the physicalPlan could fallback to regular DataSource scan.
This will involve changes some aspects of query planning. Here I list the top level changes:
Add a new Distribution and Partitioning to describe this partition value and bucket column hash data layout.
Change the Logical and SparkPlan(DataSourceScanExec) to capture the above Distribution and Partitioning.
Just like ensureRequirements adds a shuffle to satisfy child distributions, we could have it add a coalesce operator to club together buckets across folder partitions if required, and at different partition hierarchies according to the distribution required. For example: With PartitioningCols(a,b) and BucketingCols(c), a join that involves (a,c) can be answered by coalescing the b values within (a,c)'s partitions. This would be a meta-data only operation.
Account for co-clustered partitions so that RDDs can be zipped in joins - this will have to handle partitions pruned out too.
There is a strong requirement for this functionality at my team (in Amazon).
I've opened a JIRA regarding this issue here.
I did consider DataSourcesV2, but it looks like a better fit here.
I wanted some inputs regarding this.
Is this approach feasible and is it aligned with how Spark wants to handle native datasources in the future?
Does anyone else have similar requirements?