[DISCUSS] SPIP: APIs for Table Metadata Operations

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

rxin
Seems reasonable at high level. I don't think we can use Expression's and SortOrder's in public APIs though. Those are not meant to be public and can break easily across versions.


On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue <[hidden email]> wrote:

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue

I don’t think that we want to block this work until we have a public and stable Expression. Like our decision to expose InternalRow, I think that while this option isn’t great, it at least allows us to move forward. We can hopefully replace it later.

Also note that the use of Expression is in the plug-in API, not in the public API. I think that it is easier to expect data source implementations to handle some instability here. We already use Expression as an option for push-down in DSv2 so there’s precedent for it. Plus, we need to be able to pass more complex expressions between the sources and Spark for sorting and clustering data when it’s written to DSv2 (SPARK-23889).

Simple expressions for bucketing and column-based partitions would almost certainly be stable. We can probably find a trade-off solution to not use Expression in the TableCatalog API, but we definitely need expressions for SPARK-23889.

SortOrder would be easier to replace with a more strict class based on only column data rather than expressions. For #21306, I just left it out entirely. What if I just removed it from the proposal and we can add it later?


On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin <[hidden email]> wrote:
Seems reasonable at high level. I don't think we can use Expression's and SortOrder's in public APIs though. Those are not meant to be public and can break easily across versions.


On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue <[hidden email]> wrote:

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue
Reynold, did you get a chance to look at my response about using `Expression`? I think that it's okay since it is already exposed in the v2 data source API. Plus, I wouldn't want to block this on building a public expression API that is more stable.

I think that's the only objection to this SPIP. Anyone else want to raise an issue with the proposal, or is it about time to bring up a vote thread?

rb

On Thu, Jul 26, 2018 at 5:00 PM Ryan Blue <[hidden email]> wrote:

I don’t think that we want to block this work until we have a public and stable Expression. Like our decision to expose InternalRow, I think that while this option isn’t great, it at least allows us to move forward. We can hopefully replace it later.

Also note that the use of Expression is in the plug-in API, not in the public API. I think that it is easier to expect data source implementations to handle some instability here. We already use Expression as an option for push-down in DSv2 so there’s precedent for it. Plus, we need to be able to pass more complex expressions between the sources and Spark for sorting and clustering data when it’s written to DSv2 (SPARK-23889).

Simple expressions for bucketing and column-based partitions would almost certainly be stable. We can probably find a trade-off solution to not use Expression in the TableCatalog API, but we definitely need expressions for SPARK-23889.

SortOrder would be easier to replace with a more strict class based on only column data rather than expressions. For #21306, I just left it out entirely. What if I just removed it from the proposal and we can add it later?


On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin <[hidden email]> wrote:
Seems reasonable at high level. I don't think we can use Expression's and SortOrder's in public APIs though. Those are not meant to be public and can break easily across versions.


On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue <[hidden email]> wrote:

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

rxin
Sorry I completely disagree with using Expression in critical public APIs that we expect a lot of developers to use. There's a huge difference between exposing InternalRow vs Expression. InternalRow is a relatively small surface (still quite large) that I can see ourselves within a version getting to a point to make it stable, while Expression is everything in Spark SQL, including all the internal implementations, referencing logical plans and physical plans (due to subqueries). They weren't designed as public APIs, and it is simply not feasible to make them public APIs without breaking things all the time. I can however see ourselves creating a smaller scope, parallel public expressions API, similar to what we did for dsv1.

If we are depending on Expressions on the more common APIs in dsv2 already, we should revisit that.




On Mon, Aug 13, 2018 at 1:59 PM Ryan Blue <[hidden email]> wrote:
Reynold, did you get a chance to look at my response about using `Expression`? I think that it's okay since it is already exposed in the v2 data source API. Plus, I wouldn't want to block this on building a public expression API that is more stable.

I think that's the only objection to this SPIP. Anyone else want to raise an issue with the proposal, or is it about time to bring up a vote thread?

rb

On Thu, Jul 26, 2018 at 5:00 PM Ryan Blue <[hidden email]> wrote:

I don’t think that we want to block this work until we have a public and stable Expression. Like our decision to expose InternalRow, I think that while this option isn’t great, it at least allows us to move forward. We can hopefully replace it later.

Also note that the use of Expression is in the plug-in API, not in the public API. I think that it is easier to expect data source implementations to handle some instability here. We already use Expression as an option for push-down in DSv2 so there’s precedent for it. Plus, we need to be able to pass more complex expressions between the sources and Spark for sorting and clustering data when it’s written to DSv2 (SPARK-23889).

Simple expressions for bucketing and column-based partitions would almost certainly be stable. We can probably find a trade-off solution to not use Expression in the TableCatalog API, but we definitely need expressions for SPARK-23889.

SortOrder would be easier to replace with a more strict class based on only column data rather than expressions. For #21306, I just left it out entirely. What if I just removed it from the proposal and we can add it later?


On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin <[hidden email]> wrote:
Seems reasonable at high level. I don't think we can use Expression's and SortOrder's in public APIs though. Those are not meant to be public and can break easily across versions.


On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue <[hidden email]> wrote:

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue
I agree that it would be great to have a stable public expression API that corresponds to what is parsed, not the implementations. That would be great, but I worry that it will get out of date, and a data source that needs to support a new expression has to wait up to 6 months for a public release with it.

Another, lesser problem is that this currently blocks CTAS (for creating partitioned tables) and DeleteSupport (for deleting data by expression) in the v2 API. Because the new logical plans depend on the CreateTable and DeleteSupport APIs that need expressions, they will be delayed.

That's not a bad thing -- I'd much rather get this right -- but I'm concerned about the rush to move over to v2. With the new plans, **there's no need to use SaveMode that will introduce unpredictable behavior in v2**, but others have suggested supporting SaveMode until the new plans are finished. I think it is fine to support reads and append to new tables for now, but not having an expression is making that pressure to compromise the v2 API worse. It's also not clear how this support would be cleanly removed. (See this thread for context)

I'll start working on an alternative expression API. I think for table creation we just need a small one for now. We'll need to extend it for DeleteSupport, though. And, we will need to remove the support in v2 for pushing Expression; hopefully sooner than later.

rb

On Wed, Aug 15, 2018 at 10:34 AM Reynold Xin <[hidden email]> wrote:
Sorry I completely disagree with using Expression in critical public APIs that we expect a lot of developers to use. There's a huge difference between exposing InternalRow vs Expression. InternalRow is a relatively small surface (still quite large) that I can see ourselves within a version getting to a point to make it stable, while Expression is everything in Spark SQL, including all the internal implementations, referencing logical plans and physical plans (due to subqueries). They weren't designed as public APIs, and it is simply not feasible to make them public APIs without breaking things all the time. I can however see ourselves creating a smaller scope, parallel public expressions API, similar to what we did for dsv1.

If we are depending on Expressions on the more common APIs in dsv2 already, we should revisit that.




On Mon, Aug 13, 2018 at 1:59 PM Ryan Blue <[hidden email]> wrote:
Reynold, did you get a chance to look at my response about using `Expression`? I think that it's okay since it is already exposed in the v2 data source API. Plus, I wouldn't want to block this on building a public expression API that is more stable.

I think that's the only objection to this SPIP. Anyone else want to raise an issue with the proposal, or is it about time to bring up a vote thread?

rb

On Thu, Jul 26, 2018 at 5:00 PM Ryan Blue <[hidden email]> wrote:

I don’t think that we want to block this work until we have a public and stable Expression. Like our decision to expose InternalRow, I think that while this option isn’t great, it at least allows us to move forward. We can hopefully replace it later.

Also note that the use of Expression is in the plug-in API, not in the public API. I think that it is easier to expect data source implementations to handle some instability here. We already use Expression as an option for push-down in DSv2 so there’s precedent for it. Plus, we need to be able to pass more complex expressions between the sources and Spark for sorting and clustering data when it’s written to DSv2 (SPARK-23889).

Simple expressions for bucketing and column-based partitions would almost certainly be stable. We can probably find a trade-off solution to not use Expression in the TableCatalog API, but we definitely need expressions for SPARK-23889.

SortOrder would be easier to replace with a more strict class based on only column data rather than expressions. For #21306, I just left it out entirely. What if I just removed it from the proposal and we can add it later?


On Thu, Jul 26, 2018 at 4:32 PM Reynold Xin <[hidden email]> wrote:
Seems reasonable at high level. I don't think we can use Expression's and SortOrder's in public APIs though. Those are not meant to be public and can break easily across versions.


On Tue, Jul 24, 2018 at 9:26 AM Ryan Blue <[hidden email]> wrote:

The recently adopted SPIP to standardize logical plans requires a way for to plug in providers for table metadata operations, so that the new plans can create and drop tables. I proposed an API to do this in a follow-up SPIP on APIs for Table Metadata Operations. This thread is to discuss that proposal.

There are two main parts:

  • A public facing API for creating, altering, and dropping tables
  • An API for catalog implementations to provide the underlying table operations

The main need is for the plug-in API, but I included the public one because there isn’t currently a friendly public API to create tables and I think it helps to see how both would work together.

Here’s a sample of the proposed public API:

catalog.createTable("db.table")
    .addColumn("id", LongType)
    .addColumn("data", StringType, nullable=true)
    .addColumn("ts", TimestampType)
    .partitionBy(day($"ts"))
    .config("prop", "val")
    .commit()

And here’s a sample of the catalog plug-in API:

Table createTable(
    TableIdentifier ident,
    StructType schema,
    List<Expression> partitions,
    Optional<List<SortOrder>> sortOrder,
    Map<String, String> properties)

Note that this API passes both bucketing and column-based partitioning as Expressions. This is a generalization that makes it possible for the table to use the relationship between columns and partitions. In the example above, data is partitioned by the day of the timestamp field. Because the expression is passed to the table, the table can use predicates on the timestamp to filter out partitions without an explicit partition predicate. There’s more detail in the proposal on this.

The SPIP is for the APIs and does not cover how multiple catalogs would be exposed. I started a separate discussion thread on how to access multiple catalogs and maintain compatibility with Spark’s current behavior (how to get the catalog instance in the above example).

Please use this thread to discuss the proposed APIs. Thanks, everyone!

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

rxin
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather than physical columns, because he didn't want his users to keep adding the "date" column when in reality they are purely derived from some timestamp column. We reached consensus on this is a great use case and something we should support.

We are still debating how to do this at API level. Two options:

Option 1. Create a smaller surfaced, parallel Expression library, and use that for specifying partition columns. The bare minimum class hierarchy would look like: 

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be up to the data sources to interpret them. As an example, for a table partitioned by date(ts), Spark would pass the following to the underlying ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


Option 2. Spark passes strings over to the data sources. For the above example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1 creates more rigid structure, with extra complexity in API design. Option 2 is less structured but more flexible. Option 1 gives Spark the opportunity to enforce column references are valid (but not the actual function names), whereas option 2 would be up to the data sources to validate.

 

On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue <[hidden email]> wrote:

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

Ryan Blue
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it make sources easier to implement by handling parsing, it also makes sources more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the table, which should be done using the case sensitivity of the analyzer. Spark would pass normalized column names that match the case of the declared columns to ensure that there isn't a problem if Spark is case insensitive but the source doesn't implement it. And the source wouldn't have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin <[hidden email]> wrote:
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather than physical columns, because he didn't want his users to keep adding the "date" column when in reality they are purely derived from some timestamp column. We reached consensus on this is a great use case and something we should support.

We are still debating how to do this at API level. Two options:

Option 1. Create a smaller surfaced, parallel Expression library, and use that for specifying partition columns. The bare minimum class hierarchy would look like: 

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be up to the data sources to interpret them. As an example, for a table partitioned by date(ts), Spark would pass the following to the underlying ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


Option 2. Spark passes strings over to the data sources. For the above example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1 creates more rigid structure, with extra complexity in API design. Option 2 is less structured but more flexible. Option 1 gives Spark the opportunity to enforce column references are valid (but not the actual function names), whereas option 2 would be up to the data sources to validate.

 

On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue <[hidden email]> wrote:

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

RussS
I'm a big fan of 1 as well. I had to implement something similar using custom expressions and it was a bit more work than it should be. In particular our use case is that columns have certain metadata (ttl, writetime) which exist not as separate columns but as special values which can be surfaced.

I still don't have a good solution for the same thing at write-time though since the problem is a bit asymmetric for us. While you can read a metadata from any particular cell, on write you specify it for the whole row. 

On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue <[hidden email]> wrote:
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it make sources easier to implement by handling parsing, it also makes sources more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the table, which should be done using the case sensitivity of the analyzer. Spark would pass normalized column names that match the case of the declared columns to ensure that there isn't a problem if Spark is case insensitive but the source doesn't implement it. And the source wouldn't have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin <[hidden email]> wrote:
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather than physical columns, because he didn't want his users to keep adding the "date" column when in reality they are purely derived from some timestamp column. We reached consensus on this is a great use case and something we should support.

We are still debating how to do this at API level. Two options:

Option 1. Create a smaller surfaced, parallel Expression library, and use that for specifying partition columns. The bare minimum class hierarchy would look like: 

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be up to the data sources to interpret them. As an example, for a table partitioned by date(ts), Spark would pass the following to the underlying ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


Option 2. Spark passes strings over to the data sources. For the above example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1 creates more rigid structure, with extra complexity in API design. Option 2 is less structured but more flexible. Option 1 gives Spark the opportunity to enforce column references are valid (but not the actual function names), whereas option 2 would be up to the data sources to validate.

 

On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue <[hidden email]> wrote:

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

rxin
Russell your special columns wouldn’t actually work with option 1 because Spark would have to fail them in analysis without an actual physical column. 

On Tue, Sep 4, 2018 at 9:12 PM Russell Spitzer <[hidden email]> wrote:
I'm a big fan of 1 as well. I had to implement something similar using custom expressions and it was a bit more work than it should be. In particular our use case is that columns have certain metadata (ttl, writetime) which exist not as separate columns but as special values which can be surfaced.

I still don't have a good solution for the same thing at write-time though since the problem is a bit asymmetric for us. While you can read a metadata from any particular cell, on write you specify it for the whole row. 

On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue <[hidden email]> wrote:
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it make sources easier to implement by handling parsing, it also makes sources more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the table, which should be done using the case sensitivity of the analyzer. Spark would pass normalized column names that match the case of the declared columns to ensure that there isn't a problem if Spark is case insensitive but the source doesn't implement it. And the source wouldn't have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin <[hidden email]> wrote:
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather than physical columns, because he didn't want his users to keep adding the "date" column when in reality they are purely derived from some timestamp column. We reached consensus on this is a great use case and something we should support.

We are still debating how to do this at API level. Two options:

Option 1. Create a smaller surfaced, parallel Expression library, and use that for specifying partition columns. The bare minimum class hierarchy would look like: 

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be up to the data sources to interpret them. As an example, for a table partitioned by date(ts), Spark would pass the following to the underlying ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


Option 2. Spark passes strings over to the data sources. For the above example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1 creates more rigid structure, with extra complexity in API design. Option 2 is less structured but more flexible. Option 1 gives Spark the opportunity to enforce column references are valid (but not the actual function names), whereas option 2 would be up to the data sources to validate.

 

On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue <[hidden email]> wrote:

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

RussS
They are based on a physical column, the column is real. The function just only exists in the datasource.

For example

Select ttl(a), ttl(b) FROM table ks.tab

On Tue, Sep 4, 2018 at 11:16 PM Reynold Xin <[hidden email]> wrote:
Russell your special columns wouldn’t actually work with option 1 because Spark would have to fail them in analysis without an actual physical column. 

On Tue, Sep 4, 2018 at 9:12 PM Russell Spitzer <[hidden email]> wrote:
I'm a big fan of 1 as well. I had to implement something similar using custom expressions and it was a bit more work than it should be. In particular our use case is that columns have certain metadata (ttl, writetime) which exist not as separate columns but as special values which can be surfaced.

I still don't have a good solution for the same thing at write-time though since the problem is a bit asymmetric for us. While you can read a metadata from any particular cell, on write you specify it for the whole row. 

On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue <[hidden email]> wrote:
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it make sources easier to implement by handling parsing, it also makes sources more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the table, which should be done using the case sensitivity of the analyzer. Spark would pass normalized column names that match the case of the declared columns to ensure that there isn't a problem if Spark is case insensitive but the source doesn't implement it. And the source wouldn't have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin <[hidden email]> wrote:
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather than physical columns, because he didn't want his users to keep adding the "date" column when in reality they are purely derived from some timestamp column. We reached consensus on this is a great use case and something we should support.

We are still debating how to do this at API level. Two options:

Option 1. Create a smaller surfaced, parallel Expression library, and use that for specifying partition columns. The bare minimum class hierarchy would look like: 

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be up to the data sources to interpret them. As an example, for a table partitioned by date(ts), Spark would pass the following to the underlying ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


Option 2. Spark passes strings over to the data sources. For the above example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1 creates more rigid structure, with extra complexity in API design. Option 2 is less structured but more flexible. Option 1 gives Spark the opportunity to enforce column references are valid (but not the actual function names), whereas option 2 would be up to the data sources to validate.

 

On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue <[hidden email]> wrote:

I think I found a good solution to the problem of using Expression in the TableCatalog API and in the DeleteSupport API.

For DeleteSupport, there is already a stable and public subset of Expression named Filter that can be used to pass filters. The reason why DeleteSupport would use Expression is to support more complex expressions like to_date(ts) = '2018-08-15' that are translated to ts >= 1534316400000000 AND ts < 1534402800000000. But, this can be done in Spark instead of the data sources so I think DeleteSupport should use Filter instead. I updated the DeleteSupport PR #21308 with these changes.

Also, I agree that the DataSourceV2 API should also not expose Expression, so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter.

For TableCatalog, I took a similar approach instead of introducing a parallel Expression API. Instead, I created a PartitionTransform API (like Filter) that communicates the transformation function, function parameters like num buckets, and column references. I updated the TableCatalog PR #21306 to use PartitionTransform instead of Expression and I updated the text of the SPIP doc.

I also raised a concern about needing to wait for Spark to add support for new expressions (now partition transforms). To get around this, I added an apply transform that passes the name of a function and an input column. That way, users can still pass transforms that Spark doesn’t know about by name to data sources: apply("source_function", "colName").

Please have a look at the updated pull requests and SPIP doc and comment!

rb



--
Ryan Blue
Software Engineer
Netflix