DataSourceV2 capability API

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSourceV2 capability API

Ryan Blue

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

rxin
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Ryan Blue

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Felix Cheung
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Ryan Blue
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

rxin
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Ryan Blue
Do you have an example in mind where we might add a capability and break old versions of data sources?

These are really for being able to tell what features a data source has. If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place. For the uses I've proposed, forward compatibility isn't a concern. When we add a capability, we add handling for it that old versions wouldn't be able to use anyway. The advantage is that we don't have to treat all sources the same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin <[hidden email]> wrote:
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

rxin
"If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place"

Consider this (just a hypothetical scenario): We added "supports-decimal" in the future, because we see a lot of data sources don't support decimal and we want a more graceful error handling. That'd break all existing data sources.

You can say we would never add any "existing" features to the feature list in the future, as a requirement for the feature list. But then I'm wondering how much does it really give you, beyond telling data sources to throw exceptions when they don't support a specific operation.


On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue <[hidden email]> wrote:
Do you have an example in mind where we might add a capability and break old versions of data sources?

These are really for being able to tell what features a data source has. If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place. For the uses I've proposed, forward compatibility isn't a concern. When we add a capability, we add handling for it that old versions wouldn't be able to use anyway. The advantage is that we don't have to treat all sources the same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin <[hidden email]> wrote:
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Ryan Blue
For that case, I think we would have a property that defines whether supports-decimal is assumed or checked with the capability.

Wouldn't we have this problem no matter what the capability API is? If we used a trait to signal decimal support, then we would have to deal with sources that were written before the trait was introduced. That doesn't change the need for some way to signal support for specific capabilities like the ones I've suggested.

On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin <[hidden email]> wrote:
"If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place"

Consider this (just a hypothetical scenario): We added "supports-decimal" in the future, because we see a lot of data sources don't support decimal and we want a more graceful error handling. That'd break all existing data sources.

You can say we would never add any "existing" features to the feature list in the future, as a requirement for the feature list. But then I'm wondering how much does it really give you, beyond telling data sources to throw exceptions when they don't support a specific operation.


On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue <[hidden email]> wrote:
Do you have an example in mind where we might add a capability and break old versions of data sources?

These are really for being able to tell what features a data source has. If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place. For the uses I've proposed, forward compatibility isn't a concern. When we add a capability, we add handling for it that old versions wouldn't be able to use anyway. The advantage is that we don't have to treat all sources the same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin <[hidden email]> wrote:
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

Ryan Blue
Another solution to the decimal case is using the capability API: use a capability to signal that the table knows about `supports-decimal`. So before the decimal support check, it would check `table.isSupported("type-capabilities")`.

On Fri, Nov 9, 2018 at 12:45 PM Ryan Blue <[hidden email]> wrote:
For that case, I think we would have a property that defines whether supports-decimal is assumed or checked with the capability.

Wouldn't we have this problem no matter what the capability API is? If we used a trait to signal decimal support, then we would have to deal with sources that were written before the trait was introduced. That doesn't change the need for some way to signal support for specific capabilities like the ones I've suggested.

On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin <[hidden email]> wrote:
"If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place"

Consider this (just a hypothetical scenario): We added "supports-decimal" in the future, because we see a lot of data sources don't support decimal and we want a more graceful error handling. That'd break all existing data sources.

You can say we would never add any "existing" features to the feature list in the future, as a requirement for the feature list. But then I'm wondering how much does it really give you, beyond telling data sources to throw exceptions when they don't support a specific operation.


On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue <[hidden email]> wrote:
Do you have an example in mind where we might add a capability and break old versions of data sources?

These are really for being able to tell what features a data source has. If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place. For the uses I've proposed, forward compatibility isn't a concern. When we add a capability, we add handling for it that old versions wouldn't be able to use anyway. The advantage is that we don't have to treat all sources the same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin <[hidden email]> wrote:
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

cloud0fan
I think this works, but there are also other solutions, e.g. mixin traits and runtime exceptions

Assuming the general abstraction is: table -> scan builder -> scan -> batch/batches (see alternative #2 in the doc)

For example, if we want to tell if a table supports continuous streaming, we can define 3 traits(interfaces)
interface SupportsBatchScan extends Table {
  ScanBuilder newScanBuilder();
}
interface SupportsMicroBatchScan extends Table {
  ScanBuilder newScanBuilder();
}
interface SupportsContinuousScan extends Table {
  ScanBuilder newScanBuilder();
}

And see if the given table implements SupportsContinuousScan or not. Note that, Java allows a class to implement different interfaces with the same method(s).

Or put everything in the table interface
interface Table {
  default ScanBuilder newSingleBatchScanBuilder() { throw exception }
  default ScanBuilder newMicroBatchScanBuilder() { throw exception }
  default ScanBuilder newContinuousScanBuilder() { throw exception }
}

And Spark just calls the corresponding method of a scan mode.


Another problem is how much type safety we want. Better type safety usually means more complicated interfaces.

For example, if we want strong type safety, we need to do the branching between batch, micro-batch and continuous modes at the table level. Then the table interface becomes (assuming we pick the mixin trait solution)
interface SupportsBatchScan extends Table {
  SingBatchScanBuilder newScanBuilder();
}
interface SupportsMicroBatchScan extends Table {
  MicroBatchScanBuilder newScanBuilder();
}
interface SupportsContinuousScan extends Table {
  ContinuousScanBuilder newScanBuilder();
}

The drawback is, we have a lot of interfaces, i.e. ContinuousScanBuilder -> ContinuousScan -> ContinuousBatch, and their corresponding versions for other 2 scan modes.


If we don't care much about type safety, we can do the  branching at the scan level:
interface Scan {
  // batch and micro-batch modes share the same Batch interface
  default Batch newSingleBatch() { throw exception }
  default Batch newMicroBatch() { throw exception }
  default ContinuousBatch newContinuousBatch() { throw exception }
}

Now we have ScanBuilder and Scan interfaces shared for all the scan modes, and only have different batch interfaces. We can delay the branching further, but that needs some refactoring of the continuous streaming data source APIs.


I think the capability API is not a must-have at the current stage, but it's worth to investigate further and see which use cases it can help.

Thanks,
Wenchen

On Sat, Nov 10, 2018 at 5:35 AM Ryan Blue <[hidden email]> wrote:
Another solution to the decimal case is using the capability API: use a capability to signal that the table knows about `supports-decimal`. So before the decimal support check, it would check `table.isSupported("type-capabilities")`.

On Fri, Nov 9, 2018 at 12:45 PM Ryan Blue <[hidden email]> wrote:
For that case, I think we would have a property that defines whether supports-decimal is assumed or checked with the capability.

Wouldn't we have this problem no matter what the capability API is? If we used a trait to signal decimal support, then we would have to deal with sources that were written before the trait was introduced. That doesn't change the need for some way to signal support for specific capabilities like the ones I've suggested.

On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin <[hidden email]> wrote:
"If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place"

Consider this (just a hypothetical scenario): We added "supports-decimal" in the future, because we see a lot of data sources don't support decimal and we want a more graceful error handling. That'd break all existing data sources.

You can say we would never add any "existing" features to the feature list in the future, as a requirement for the feature list. But then I'm wondering how much does it really give you, beyond telling data sources to throw exceptions when they don't support a specific operation.


On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue <[hidden email]> wrote:
Do you have an example in mind where we might add a capability and break old versions of data sources?

These are really for being able to tell what features a data source has. If there is no way to report a feature (e.g., able to read missing as null) then there is no way for Spark to take advantage of it in the first place. For the uses I've proposed, forward compatibility isn't a concern. When we add a capability, we add handling for it that old versions wouldn't be able to use anyway. The advantage is that we don't have to treat all sources the same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin <[hidden email]> wrote:
How do we deal with forward compatibility? Consider, Spark adds a new "property". In the past the data source supports that property, but since it was not explicitly defined, in the new version of Spark that data source would be considered not supporting that property, and thus throwing an exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue <[hidden email]> wrote:
I'd have two places. First, a class that defines properties supported and identified by Spark, like the SQLConf definitions. Second, in documentation for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung <[hidden email]> wrote:
One question is where will the list of capability strings be defined?

 

From: Ryan Blue <[hidden email]>
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API
 

Yes, we currently use traits that have methods. Something like “supports reading missing columns” doesn’t need to deliver methods. The other example is where we don’t have an object to test for a trait (scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. That could be expensive so we can use a capability to fail faster.


On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin <[hidden email]> wrote:
This is currently accomplished by having traits that data sources can extend, as well as runtime exceptions right? It's hard to argue one way vs another without knowing how things will evolve (e.g. how many different capabilities there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This API would allow Spark to query a table to determine whether it supports a capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")

There are a couple of use cases for this. First, we want to be able to fail fast when a user tries to stream a table that doesn’t support it. The design of our read implementation doesn’t necessarily support this. If we want to share the same “scan” across streaming and batch, then we need to “branch” in the API after that point, but that is at odds with failing fast. We could use capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation rules. The rule that validates appends, for example, doesn’t allow a write that is missing an optional column. That’s because the current v1 sources don’t support reading when columns are missing. But Iceberg does support reading a missing column as nulls, so that users can add a column to a table without breaking a scheduled job that populates the table. To fix this problem, I would use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 capability API

JackyLee
I don't know if it is a right thing to make table API as
ContinuousScanBuilder -> ContinuousScan -> ContinuousBatch, it makes
batch/microBatch/Continuous too different from each other.
In my opinion, these are basically similar at the table level. So is it
possible to design an API like this?
ScanBuilder -> Scan -> ContinuousBatch/MicroBatch/SingleBatch



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]