DataSourceV2 write input requirements

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

DataSourceV2 write input requirements

Patrick Woody
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ted Yu
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

cloud0fan
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix


Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue
Wenchen, I thought SupportsReportPartitioning was for the read side. It works with the write side as well?

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix





--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ted Yu
In reply to this post by cloud0fan
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix



Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Patrick Woody
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix




Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

cloud0fan
Yea it is for read-side only. I think for the write-side, implementations can provide some options to allow users to set partitioning/ordering, or the data source has a natural partitioning/ordering which doesn't require any interface.

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix





Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue
In reply to this post by Patrick Woody
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

cloud0fan
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

RussS
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

RussS
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Patrick Woody
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ryan Blue

How would Spark determine whether or not to apply a recommendation - a cost threshold?

Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s really just a clustering by an expression with the shard calculation, e.g., hash(id_col, 64). The shards should be handled as a cluster, but it doesn’t matter how many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.


On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <[hidden email]> wrote:
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Patrick Woody
Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

This was in reference to Russell's suggestion that the data source could have a required sort, but only a recommended partitioning. I don't have an immediate recommending use case that would come to mind though. I'm definitely in sync that the data source itself shouldn't do work outside of the writes themselves.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.

This is where I'm interested in learning about the separation of responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. Somewhat analogous to how the current FileSource does bin packing of small files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression.

Thanks
Pat

 

On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <[hidden email]> wrote:

How would Spark determine whether or not to apply a recommendation - a cost threshold?

Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s really just a clustering by an expression with the shard calculation, e.g., hash(id_col, 64). The shards should be handled as a cluster, but it doesn’t matter how many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.


On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <[hidden email]> wrote:
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

RussS
For added color, one thing that I may want to consider as a data source implementer is the cost / benefit of applying a particular clustering. For example, a dataset with low cardinality in the clustering key could benefit greatly from clustering on that key before writing to Cassandra since Cassandra can benefit from these sorts of batching. But the cost of performing this shuffle could outweigh the benefits of the organized data if the cardinality is lower.

I imagine other sources might have similar benefit calculations. Doing a particular sort or clustering can provide increased throughput but only in certain situations based on some facts about the data. 


For a concrete example here.

Cassandra can insert records with the same partition-key faster if they arrive in the same payload. But this is only beneficial if the incoming dataset has multiple entries for the same partition key. If the incoming source does not have any duplicates then there is no benefit to requiring a sort or partitioning. 

On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <[hidden email]> wrote:
Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

This was in reference to Russell's suggestion that the data source could have a required sort, but only a recommended partitioning. I don't have an immediate recommending use case that would come to mind though. I'm definitely in sync that the data source itself shouldn't do work outside of the writes themselves.


Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.
For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.

This is where I'm interested in learning about the separation of responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. Somewhat analogous to how the current FileSource does bin packing of small files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression.

Thanks
Pat

 

On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <[hidden email]> wrote:

How would Spark determine whether or not to apply a recommendation - a cost threshold?

Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s really just a clustering by an expression with the shard calculation, e.g., hash(id_col, 64). The shards should be handled as a cluster, but it doesn’t matter how many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.


On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <[hidden email]> wrote:
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

Ted Yu
bq. this shuffle could outweigh the benefits of the organized data if the cardinality is lower.

I wonder if you meant higher in place of the last word above.

Cheers

On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <[hidden email]> wrote:
For added color, one thing that I may want to consider as a data source implementer is the cost / benefit of applying a particular clustering. For example, a dataset with low cardinality in the clustering key could benefit greatly from clustering on that key before writing to Cassandra since Cassandra can benefit from these sorts of batching. But the cost of performing this shuffle could outweigh the benefits of the organized data if the cardinality is lower.

I imagine other sources might have similar benefit calculations. Doing a particular sort or clustering can provide increased throughput but only in certain situations based on some facts about the data. 


For a concrete example here.

Cassandra can insert records with the same partition-key faster if they arrive in the same payload. But this is only beneficial if the incoming dataset has multiple entries for the same partition key. If the incoming source does not have any duplicates then there is no benefit to requiring a sort or partitioning. 

On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <[hidden email]> wrote:
Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

This was in reference to Russell's suggestion that the data source could have a required sort, but only a recommended partitioning. I don't have an immediate recommending use case that would come to mind though. I'm definitely in sync that the data source itself shouldn't do work outside of the writes themselves.


Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.
For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.

This is where I'm interested in learning about the separation of responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. Somewhat analogous to how the current FileSource does bin packing of small files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression.

Thanks
Pat

 

On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <[hidden email]> wrote:

How would Spark determine whether or not to apply a recommendation - a cost threshold?

Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s really just a clustering by an expression with the shard calculation, e.g., hash(id_col, 64). The shards should be handled as a cluster, but it doesn’t matter how many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.


On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <[hidden email]> wrote:
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix


Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 write input requirements

RussS
Ah yeah sorry I got a bit mixed up.

On Wed, Mar 28, 2018 at 7:54 PM Ted Yu <[hidden email]> wrote:
bq. this shuffle could outweigh the benefits of the organized data if the cardinality is lower.

I wonder if you meant higher in place of the last word above.

Cheers

On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <[hidden email]> wrote:
For added color, one thing that I may want to consider as a data source implementer is the cost / benefit of applying a particular clustering. For example, a dataset with low cardinality in the clustering key could benefit greatly from clustering on that key before writing to Cassandra since Cassandra can benefit from these sorts of batching. But the cost of performing this shuffle could outweigh the benefits of the organized data if the cardinality is lower.

I imagine other sources might have similar benefit calculations. Doing a particular sort or clustering can provide increased throughput but only in certain situations based on some facts about the data. 


For a concrete example here.

Cassandra can insert records with the same partition-key faster if they arrive in the same payload. But this is only beneficial if the incoming dataset has multiple entries for the same partition key. If the incoming source does not have any duplicates then there is no benefit to requiring a sort or partitioning. 

On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <[hidden email]> wrote:
Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

This was in reference to Russell's suggestion that the data source could have a required sort, but only a recommended partitioning. I don't have an immediate recommending use case that would come to mind though. I'm definitely in sync that the data source itself shouldn't do work outside of the writes themselves.


Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.
For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.

This is where I'm interested in learning about the separation of responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. Somewhat analogous to how the current FileSource does bin packing of small files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression.

Thanks
Pat

 

On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <[hidden email]> wrote:

How would Spark determine whether or not to apply a recommendation - a cost threshold?

Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s really just a clustering by an expression with the shard calculation, e.g., hash(id_col, 64). The shards should be handled as a cluster, but it doesn’t matter how many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files.


On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <[hidden email]> wrote:
How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.

Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <[hidden email]> wrote:
Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ...  I think that would be useful to request based on details about the incoming dataset. 

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <[hidden email]> wrote:
A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <[hidden email]> wrote:
I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <[hidden email]> wrote:
Thanks, it makes sense that the existing interface is for aggregation and not joins. Why are there requirements for the number of partitions that are returned then?

Does it makes sense to design the write-side `Requirement` classes and the read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <[hidden email]> wrote:
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <[hidden email]> wrote:
I just took a look at SupportsReportPartitioning and I'm not sure that it will work for real use cases. It doesn't specify, as far as I can tell, a hash function for combining clusters into tasks or a way to provide Spark a hash function for the other side of a join. It seems unlikely to me that many data sources would have partitioning that happens to match the other side of a join. And, it looks like task order matters? Maybe I'm missing something?

I think that we should design the write side independently based on what data stores actually need, and take a look at the read side based on what data stores can actually provide. Wenchen, was there a design doc for partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all of our data engineers to add a sort to most tables because it introduces the range partitioner and does a skew calculation, in addition to making data filtering much better when it is read. It's really common for tables to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <[hidden email]> wrote:
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure something similar to requiredChildDistribution in SparkPlan where we have the number of partitions as well if we'd want to further report to SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering purposes on Parquet row group stats if we have a time based/high cardinality ID field. If my datasource or catalog knows about previous queries on a table, it could be really useful to recommend more appropriate formatting for consumers on the next materialization. The same would be true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <[hidden email]> wrote:
Hmm. Ryan seems to be right.

Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java :

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <[hidden email]> wrote:
Actually clustering is already supported, please take a look at SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <[hidden email]> wrote:
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at what requiredOrdering() returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <[hidden email]> wrote:

Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it as well. I don’t think there is a proposed API yet, but I was thinking something like this:

interface RequiresClustering {
  List<Expression> requiredClustering();
}

interface RequiresSort {
  List<SortOrder> requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it needs to be able to customize the implementation. For example, writing to HTable would require building a key (or the data for a key) and that might use a hash function that differs from Spark’s built-ins. RequiresSort is fairly straightforward, but the interaction between the two requirements deserves some consideration. To make the two compatible, I think that RequiresSort must be interpreted as a sort within each partition of the clustering, but could possibly be used for a global sort when the two overlap.

For example, if I have a table partitioned by “day” and “category” then the RequiredClustering would be by day, category. A required sort might be day ASC, category DESC, name ASC. Because that sort satisfies the required clustering, it could be used for a global ordering. But, is that useful? How would the global ordering matter beyond a sort within each partition, i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP I’m working on to consolidate and clean up logical plans. That proposes more specific uses for the DataSourceV2 API that should help clarify what validation needs to take place. As for custom catalyst rules, I’d like to hear about the use cases to see if we can build it into these improvements.

rb


On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <[hidden email]> wrote:
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might have the data source inform Spark of requirements for the input data's ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be thinking about the responsibility of the data source writer, when I should be inserting a custom catalyst rule, and how I should handle validation/assumptions of the table before attempting the write.

Thanks!
Pat



--
Ryan Blue
Software Engineer
Netflix







--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix


12