[DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

RussS

While the ScalaDocs for DataFrameWriter say

/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
*
* @since 1.4.0
*/

As far as I can tell, using DataFrame writer with a TableProviding DataSource V2 will still default to ErrorIfExists which breaks existing code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
of DataframeWriter there were versions which differentiated between DSV2 and DSV1 and set the mode accordingly but this seems to no longer be the case. Was this intentional? I feel like if we could
have the default be based on the Source then upgrading code from DSV1 -> DSV2 would be much easier for users.

I'm currently testing this on RC2


Any thoughts?

Thanks for your time as usual,
Russ
Reply | Threaded
Open this post in threaded view
|

Re: [DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

Burak Yavuz-2
Hey Russell,

Great catch on the documentation. It seems out of date. I honestly am against having different DataSources having different default SaveModes. Users will have no clue if a DataSource implementation is V1 or V2. It seems weird that the default value can change for something that I have no clue about. Especially for connectors like the Cassandra Connector or Delta Lake, where they have been V1 DataSources for a long time, and may continue to have both code paths for a while, this would cause even more confusion.

What is a problem you're having right now that you would prefer different defaults?

Best,
Burak

On Wed, May 20, 2020 at 2:50 PM Russell Spitzer <[hidden email]> wrote:

While the ScalaDocs for DataFrameWriter say

/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
*
* @since 1.4.0
*/

As far as I can tell, using DataFrame writer with a TableProviding DataSource V2 will still default to ErrorIfExists which breaks existing code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
of DataframeWriter there were versions which differentiated between DSV2 and DSV1 and set the mode accordingly but this seems to no longer be the case. Was this intentional? I feel like if we could
have the default be based on the Source then upgrading code from DSV1 -> DSV2 would be much easier for users.

I'm currently testing this on RC2


Any thoughts?

Thanks for your time as usual,
Russ
Reply | Threaded
Open this post in threaded view
|

Re: [DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

Ryan Blue
In reply to this post by RussS
The context on this is that it was confusing that the mode changed, which introduced different behaviors for the same user code when moving from v1 to v2. Burak pointed this out and I agree that it's weird that if your dependency changes from v1 to v2, your compiled Spark job starts appending instead of erroring out when the table exists.

The work-around is to implement a new trait, SupportsCatalogOptions, that allows you to extract a table identifier and catalog name from the options in the DataFrameReader. That way, you can re-route to your catalog so that Spark correctly uses a CreateTableAsSelect statement for ErrorIfExists mode. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java

On Wed, May 20, 2020 at 2:50 PM Russell Spitzer <[hidden email]> wrote:

While the ScalaDocs for DataFrameWriter say

/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
*
* @since 1.4.0
*/

As far as I can tell, using DataFrame writer with a TableProviding DataSource V2 will still default to ErrorIfExists which breaks existing code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
of DataframeWriter there were versions which differentiated between DSV2 and DSV1 and set the mode accordingly but this seems to no longer be the case. Was this intentional? I feel like if we could
have the default be based on the Source then upgrading code from DSV1 -> DSV2 would be much easier for users.

I'm currently testing this on RC2


Any thoughts?

Thanks for your time as usual,
Russ


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

RussS
I think those are fair concerns, I was mostly just updating tests for RC2 and adding in "append" everywhere

Code like
spark.sql(s"SELECT a, b from $ks.test1")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test_insert1", "keyspace" -> ks))
.save()
Now fails at runtime, while it would have succeeded before. This is probably not a huge issue since the majority of actual usages aren't writing to empty tables. 

I think my main concern here is that a lot of our old demos and tutorials where

* Make table outside of Spark
* Write to table with spark

Now obviously they can be done in a single operation in spark so that's probably the best path forward. The old pathway is pretty awkward, I just didn't really want it to break it didn't have to but I think having different defaults is definitely not intuitive.

I think the majority of other use cases are "append" anyway so it's not a big pain for non-demo / just trying things out users.

Thanks for commenting,
Russ


On Wed, May 20, 2020 at 5:00 PM Ryan Blue <[hidden email]> wrote:
The context on this is that it was confusing that the mode changed, which introduced different behaviors for the same user code when moving from v1 to v2. Burak pointed this out and I agree that it's weird that if your dependency changes from v1 to v2, your compiled Spark job starts appending instead of erroring out when the table exists.

The work-around is to implement a new trait, SupportsCatalogOptions, that allows you to extract a table identifier and catalog name from the options in the DataFrameReader. That way, you can re-route to your catalog so that Spark correctly uses a CreateTableAsSelect statement for ErrorIfExists mode. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java

On Wed, May 20, 2020 at 2:50 PM Russell Spitzer <[hidden email]> wrote:

While the ScalaDocs for DataFrameWriter say

/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
*
* @since 1.4.0
*/

As far as I can tell, using DataFrame writer with a TableProviding DataSource V2 will still default to ErrorIfExists which breaks existing code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
of DataframeWriter there were versions which differentiated between DSV2 and DSV1 and set the mode accordingly but this seems to no longer be the case. Was this intentional? I feel like if we could
have the default be based on the Source then upgrading code from DSV1 -> DSV2 would be much easier for users.

I'm currently testing this on RC2


Any thoughts?

Thanks for your time as usual,
Russ


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

RussS
Another related issue for backwards compatibility, In Datasource.scala
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L415-L416

Will get triggered even when the class is a Valid DatasourceV2 but being used in a DatasourceV1 Context.

For example if I run


val createDDL =
s"""CREATE TABLE IF NOT EXISTS $registerName
|USING org.apache.spark.sql.cassandra
|OPTIONS (
| $optionString
|)""".stripMargin

spark.sql(createDDL)

On the default catalog, I will get an exception 

org.apache.spark.sql.AnalysisException: org.apache.spark.sql.cassandra is not a valid Spark SQL Data Source.;

I can understand that perhaps we no longer want DSV2 sources to be able to create session catalog entries anymore, but I think at a bare minimum we should change the error message in case of a V2 source recommending the DSV2 api

On Wed, May 20, 2020 at 5:07 PM Russell Spitzer <[hidden email]> wrote:
I think those are fair concerns, I was mostly just updating tests for RC2 and adding in "append" everywhere

Code like
spark.sql(s"SELECT a, b from $ks.test1")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test_insert1", "keyspace" -> ks))
.save()
Now fails at runtime, while it would have succeeded before. This is probably not a huge issue since the majority of actual usages aren't writing to empty tables. 

I think my main concern here is that a lot of our old demos and tutorials where

* Make table outside of Spark
* Write to table with spark

Now obviously they can be done in a single operation in spark so that's probably the best path forward. The old pathway is pretty awkward, I just didn't really want it to break it didn't have to but I think having different defaults is definitely not intuitive.

I think the majority of other use cases are "append" anyway so it's not a big pain for non-demo / just trying things out users.

Thanks for commenting,
Russ


On Wed, May 20, 2020 at 5:00 PM Ryan Blue <[hidden email]> wrote:
The context on this is that it was confusing that the mode changed, which introduced different behaviors for the same user code when moving from v1 to v2. Burak pointed this out and I agree that it's weird that if your dependency changes from v1 to v2, your compiled Spark job starts appending instead of erroring out when the table exists.

The work-around is to implement a new trait, SupportsCatalogOptions, that allows you to extract a table identifier and catalog name from the options in the DataFrameReader. That way, you can re-route to your catalog so that Spark correctly uses a CreateTableAsSelect statement for ErrorIfExists mode. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java

On Wed, May 20, 2020 at 2:50 PM Russell Spitzer <[hidden email]> wrote:

While the ScalaDocs for DataFrameWriter say

/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
* source v2, the default option is `Append`.
*
* @since 1.4.0
*/

As far as I can tell, using DataFrame writer with a TableProviding DataSource V2 will still default to ErrorIfExists which breaks existing code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
of DataframeWriter there were versions which differentiated between DSV2 and DSV1 and set the mode accordingly but this seems to no longer be the case. Was this intentional? I feel like if we could
have the default be based on the Source then upgrading code from DSV1 -> DSV2 would be much easier for users.

I'm currently testing this on RC2


Any thoughts?

Thanks for your time as usual,
Russ


--
Ryan Blue
Software Engineer
Netflix