SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

Ryan Blue

Over the last couple years, I’ve noticed a trend toward specialized logical plans and increasing use of RunnableCommand nodes. DataSourceV2 is currently on the same path, and I’d like to make the case that we should avoid these practices.

I think it’s helpful to consider an example I’ve been watching over time, InsertIntoTable. I might not get all the details exactly right here, but the overall story should be familiar.

When I first started looking at Spark’s SQL engine, here’s roughly how an insert was supposed to go (the plans have been changed to be simpler):

  • SQL is parsed and produces the parsed plan:
    InsertIntoTable(table=UnresolvedRelation(name), data=Project(a,b,d, from=...))
  • Analyzer rules: unresolved tables are looked up and replaced:
    InsertIntoTable(table=HiveRelation(name, schema=a,b,c), data=Project(a,b,d, from=...))
  • Analyzer rules: columns are checked against the table and type inconsistencies fixed:
    InsertIntoTable(table=HiveRelation(name, schema=a,b,c), data=Project(a,b,cast(d as string), from=Project(...)))

  • The analyzer reaches a fixed point

  • The planner asserts that the plan is resolved, which verifies that all references are satisfied, including a check in InsertIntoTable that the table’s schema is compatible with the logical plan that produces data.
  • The optimizer runs: among other changes, the double-project that was inserted by analysis is rewritten
  • The planner converts InsertIntoTable(HiveRelation, _) to what we’d now call InsertIntoHiveTableExec

Over time, this basic structure changed in small but important ways:

  • Data sources used a similar, but different pre-processing rule to line up the incoming data’s logical plan with a table
  • Because there were multiple definitions of “resolved” implemented in InsertIntoTable#resolved (data sources and Hive), it was hard to maintain the function
  • InsertIntoTable was replaced on the data source side with InsertIntoDataSourceCommand and the resolved problem went away
  • InsertIntoTable was changed so that resolved is never true. It must be replaced with a command node before the optimizer runs for all write paths.
  • InsertIntoDataSourceCommand never added a similar resolved check, so there is no validation that the pre-processing rules ran, or produced a correct plan

What we ended up doing, through a series of fairly reasonable changes, introduced little inconsistencies between different write paths in SQL, even though they start with the same logical plan. Although the plan is identical to begin with, specialized nodes are inserted early, and then analysis and optimizer rules are built around those specialized nodes. Basically, physical plan details leaked into the logical plan.

This has a negative unintended consequence for DataSourceV2. The new API uses specialized plan nodes from the start, when DataFrameWriter creates the plan. But there are no rules that are ready to match DSv2 plans; we need to think of all the rules that should match and then go add expressions so they are run. There is no validation before the optimizer runs that ensures the data frame that will be written has the same schema or even the same number of columns as the target table.

Another change was to put implementation in the RunnableCommand logical plan nodes, which are added to the physical plan using a generic ExecutedCommandExec. This also makes some sense: why have InsertIntoDataSourceCommand and InsertIntoDataSourceCommandExec when one is just a place-holder?

The unintended consequence of using RunnableCommand is that we can now accidentally add logical plans into physical plans. This is what’s happening when the Spark UI only shows a single node and no physical plan details. This broke SQL shown in the UI, and metrics because they weren’t passed up the physical plan tree (because it was split with a logical plan).

I can see why we’ve gotten to this place, but I think now is the time to fix this technical debt. To get an idea why, I’ll present our alternate history:

Back when the data source write path started using a different pre-process rule, we started maintaining our own set of rules that used the same resolved method and the same InsertIntoTable plan. When we pick up new versions, we change the analyzer rules so that we use the same rules for all InsertIntoTable regardless of the target table’s write path. As a result, when we have consistent behavior to match table columns to data frame columns across all write paths. And when we backported DataSourceV2, all I needed was to change the plan to InsertIntoTable(table=DataSourceV2Relation(), data=...). After that, column resolution worked just like Hive or DataSource tables because all of our rules that match InsertIntoTable started matching the new tables, too.

I know that some of the fragmentation has gotten a lot better lately, but we still have to add rules by hand. Case in point: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L386-L395

So here are my recommendations for moving forward, with DataSourceV2 as a starting point:

  1. Use well-defined logical plan nodes for all high-level operations: insert, create, CTAS, overwrite table, etc.
  2. Use rules that match on these high-level plan nodes, so that it isn’t necessary to create rules to match each eventual code path individually
  3. Define Spark’s behavior for these logical plan nodes. Physical nodes should implement that behavior, but all CREATE TABLE OVERWRITE should (eventually) make the same guarantees.
  4. Specialize implementation when creating a physical plan, not logical plans.

I realize this is really long, but I’d like to hear thoughts about this. I’m sure I’ve left out some additional context, but I think the main idea here is solid: lets standardize logical plans for more consistent behavior and easier maintenance.

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

Michael Armbrust

So here are my recommendations for moving forward, with DataSourceV2 as a starting point:

  1. Use well-defined logical plan nodes for all high-level operations: insert, create, CTAS, overwrite table, etc.
  2. Use rules that match on these high-level plan nodes, so that it isn’t necessary to create rules to match each eventual code path individually
  3. Define Spark’s behavior for these logical plan nodes. Physical nodes should implement that behavior, but all CREATE TABLE OVERWRITE should (eventually) make the same guarantees.
  4. Specialize implementation when creating a physical plan, not logical plans.

I realize this is really long, but I’d like to hear thoughts about this. I’m sure I’ve left out some additional context, but I think the main idea here is solid: lets standardize logical plans for more consistent behavior and easier maintenance.

Context aside, I really like these rules! I think having query planning be the boundary for specialization makes a lot of sense.  

(RunnableCommand might also be my fault though.... sorry! :P) 
Reply | Threaded
Open this post in threaded view
|

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

Ryan Blue

Thanks for responding!

I’ve been coming up with a list of the high-level operations that are needed. I think all of them come down to 5 questions about what’s happening:

  • Does the target table exist?
  • If it does exist, should it be dropped?
  • If not, should it get created?
  • Should data get written to the table?
  • Should data get deleted from the table?

Using those, you can list out all the potential operations. Here’s a flow chart that makes it easier to think about:

Table exists?          No                                        Yes
                        |                                         |
Drop table?            N/A                    Yes <---------------+--------------> No
                        |                      |                                    |
Create table?    Yes <--+--> No          Yes <-+-> No                             Exists
                  |          Noop         |        DropTable                        |
Write data? Yes <-+-> No            Yes <-+-> No                     Yes <----------+---------> No
            CTAS      CreateTable   RTAS      ReplaceTable            |                         |
Delete data?                                                  Yes <---+---> No           Yes <--+--> No
                                                              ReplaceData   InsertInto   DeleteFrom  Noop

Some of these can be broken down into other operations (replace table = drop & create), but I think it is valuable to consider each one and think about whether it should be atomic. CTAS is a create and an insert that guarantees the table exists only if the insert succeeded. Should we also support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar guarantee that the original table will be dropped if and only if the write succeeds?

As a sanity check, most of these operations correspond to SQL statements for tables

  • CreateTable = CREATE TABLE t
  • DropTable = DROP TABLE t
  • ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
  • CTAS = CREATE TABLE t AS SELECT ...
  • RTAS = ??? (we could add REPLACE TABLE t AS ...)

Or for data:

  • DeleteFrom = DELETE FROM t WHERE ...
  • InsertInto = INSERT INTO t SELECT ...
  • ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN; DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;

The last one, ReplaceData, is interesting because only one specific case is currently supported and requires partitioning.

I think we need to consider all of these operations while building DataSourceV2. We still need to define what v2 sources should do.

Also, I would like to see a way to provide weak guarantees easily and another way for v2 sources to implement stronger guarantees. For example, CTAS can be implemented as a create, then an insert, with a drop if the insert fails. That covers most cases and is easy to implement. But some table formats can provide stronger guarantees. Iceberg supports atomic create-and-insert, so that a table ever exists unless its write succeeds, and it’s not just rolled back if the driver is still alive after a failure. If we implement the basics (create, insert, drop-on-failure) in Spark, I think we will end up with more data sources that have reliable behavior.

Would anyone be interested in an improvement proposal for this? It would be great to document this and build consensus around Spark’s expected behavior. I can write it up.

rb


On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust <[hidden email]> wrote:

So here are my recommendations for moving forward, with DataSourceV2 as a starting point:

  1. Use well-defined logical plan nodes for all high-level operations: insert, create, CTAS, overwrite table, etc.
  2. Use rules that match on these high-level plan nodes, so that it isn’t necessary to create rules to match each eventual code path individually
  3. Define Spark’s behavior for these logical plan nodes. Physical nodes should implement that behavior, but all CREATE TABLE OVERWRITE should (eventually) make the same guarantees.
  4. Specialize implementation when creating a physical plan, not logical plans.

I realize this is really long, but I’d like to hear thoughts about this. I’m sure I’ve left out some additional context, but I think the main idea here is solid: lets standardize logical plans for more consistent behavior and easier maintenance.

Context aside, I really like these rules! I think having query planning be the boundary for specialization makes a lot of sense.  

(RunnableCommand might also be my fault though.... sorry! :P) 



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

cloud0fan
I think many advanced Spark users already have customer catalyst rules, to deal with the query plan directly, so it makes a lot of sense to standardize the logical plan. However, instead of exploring possible operations ourselves, I think we should follow the SQL standard.

ReplaceTable, RTAS:
Most of the mainstream databases don't support these 2. I think drop-all-data operation is dangerous and we should only allow users to do it with DROP TABLE.

DeleteFrom, ReplaceData:
These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE are the most common SQL statements to change the data.

There is something more we need to take care, like ALTER TABLE. I'm looking forward to a holistic SPIP about this, thanks for your contribution!

Wenchen



On Tue, Feb 6, 2018 at 8:32 AM, Ryan Blue <[hidden email]> wrote:

Thanks for responding!

I’ve been coming up with a list of the high-level operations that are needed. I think all of them come down to 5 questions about what’s happening:

  • Does the target table exist?
  • If it does exist, should it be dropped?
  • If not, should it get created?
  • Should data get written to the table?
  • Should data get deleted from the table?

Using those, you can list out all the potential operations. Here’s a flow chart that makes it easier to think about:

Table exists?          No                                        Yes
                        |                                         |
Drop table?            N/A                    Yes <---------------+--------------> No
                        |                      |                                    |
Create table?    Yes <--+--> No          Yes <-+-> No                             Exists
                  |          Noop         |        DropTable                        |
Write data? Yes <-+-> No            Yes <-+-> No                     Yes <----------+---------> No
            CTAS      CreateTable   RTAS      ReplaceTable            |                         |
Delete data?                                                  Yes <---+---> No           Yes <--+--> No
                                                              ReplaceData   InsertInto   DeleteFrom  Noop

Some of these can be broken down into other operations (replace table = drop & create), but I think it is valuable to consider each one and think about whether it should be atomic. CTAS is a create and an insert that guarantees the table exists only if the insert succeeded. Should we also support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar guarantee that the original table will be dropped if and only if the write succeeds?

As a sanity check, most of these operations correspond to SQL statements for tables

  • CreateTable = CREATE TABLE t
  • DropTable = DROP TABLE t
  • ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
  • CTAS = CREATE TABLE t AS SELECT ...
  • RTAS = ??? (we could add REPLACE TABLE t AS ...)

Or for data:

  • DeleteFrom = DELETE FROM t WHERE ...
  • InsertInto = INSERT INTO t SELECT ...
  • ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN; DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;

The last one, ReplaceData, is interesting because only one specific case is currently supported and requires partitioning.

I think we need to consider all of these operations while building DataSourceV2. We still need to define what v2 sources should do.

Also, I would like to see a way to provide weak guarantees easily and another way for v2 sources to implement stronger guarantees. For example, CTAS can be implemented as a create, then an insert, with a drop if the insert fails. That covers most cases and is easy to implement. But some table formats can provide stronger guarantees. Iceberg supports atomic create-and-insert, so that a table ever exists unless its write succeeds, and it’s not just rolled back if the driver is still alive after a failure. If we implement the basics (create, insert, drop-on-failure) in Spark, I think we will end up with more data sources that have reliable behavior.

Would anyone be interested in an improvement proposal for this? It would be great to document this and build consensus around Spark’s expected behavior. I can write it up.

rb


On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust <[hidden email]> wrote:

So here are my recommendations for moving forward, with DataSourceV2 as a starting point:

  1. Use well-defined logical plan nodes for all high-level operations: insert, create, CTAS, overwrite table, etc.
  2. Use rules that match on these high-level plan nodes, so that it isn’t necessary to create rules to match each eventual code path individually
  3. Define Spark’s behavior for these logical plan nodes. Physical nodes should implement that behavior, but all CREATE TABLE OVERWRITE should (eventually) make the same guarantees.
  4. Specialize implementation when creating a physical plan, not logical plans.

I realize this is really long, but I’d like to hear thoughts about this. I’m sure I’ve left out some additional context, but I think the main idea here is solid: lets standardize logical plans for more consistent behavior and easier maintenance.

Context aside, I really like these rules! I think having query planning be the boundary for specialization makes a lot of sense.  

(RunnableCommand might also be my fault though.... sorry! :P) 



--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

Ryan Blue

Instead of exploring possible operations ourselves, I think we should follow the SQL standard.

Most of these do. We should make conscious decisions with the standard in mind for the SQL API. But we also have the Scala API (and versions of it in other languages) and need to consider how these operations are invoked from there.

There is something more we need to take care, like ALTER TABLE.

Yes, I was excluding the simple commands like this to focus on the commands that may need to make behavior guarantees. I think those commands are related to the 5 concerns I listed.

Another way to think about this is: Alter table isn’t something you could reasonably combine with a write for a single atomic operation.

ReplaceTable, RTAS:
Most of the mainstream databases don’t support these 2. I think drop-all-data operation is dangerous and we should only allow users to do it with DROP TABLE.

I don’t think it would be too confusing for users to have REPLACE TABLE commands. The fact that the old table is dropped is clear.

There’s a good use case for this. We have analysts that produce a report table every day. They can overwrite the entire table with new data each day, but they prefer to drop the previous table and create a new one with CTAS because they don’t want to care about schema evolution. They make no guarantees about the table schema, so they use an operation, CTAS, that doesn’t constrain their work. No need to alter a table that is getting replaced.

Given a reasonable use case for dropping and recreating a table with CTAS, I think there’s a good argument for an atomic REPLACE TABLE AS SELECT operation. My users don’t want to drop the previous report table until the new one is ready, and they never want a period of time when report data is unavailable.

This is why I think it is a good idea to consider each of these. Just because it didn’t make sense in another DB to support RTAS doesn’t mean there isn’t a reason to do it now.

As for REPLACE TABLE that isn’t RTAS, I don’t think there’s a good use case because it is unlikely that we need an atomic operation. Not much goes wrong in a table create, and we don’t want to confuse users, who should generally use ALTER TABLE for schema evolution.

DeleteFrom, ReplaceData:
These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE are the most common SQL statements to change the data.

My point is that we should care what users are trying to do and whether we should support a combined atomic operation.

Replacing data is an operation that I see our data engineers using all the time. Spark already supports INSERT OVERWRITE ... PARTITION that replaces all data in a partition. We use this to continuously update summary tables as data arrives. Each hour of fact data gets added to the daily summary rollup and replaces the last summary written. Clearly, this should be an atomic operation, and it currently is.

The question for v2 is: how do we perform this same operation with the v2 API?

A transaction made of from a delete and an insert would work. Is this what we want to use? How do we add this to v2?

rb

--
Ryan Blue
Software Engineer
Netflix