data source api v2 refactoring

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

data source api v2 refactoring

rxin
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.


Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Ryan Blue
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Jungtaek Lim
Nice suggestion Reynold and great news to see that Wenchen succeeded prototyping!

One thing I would like to make sure is, how continuous mode works with such abstraction. Would continuous mode be also abstracted with Stream, and createScan would provide unbounded Scan?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 1일 (토) 오전 8:26, Ryan Blue <[hidden email]>님이 작성:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Ryan Blue
In reply to this post by Ryan Blue
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Mridul Muralidharan

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Marcelo Vanzin-2
Same here, I don't see anything from Wenchen... just replies to him.
On Sat, Sep 1, 2018 at 9:31 PM Mridul Muralidharan <[hidden email]> wrote:

>
>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
> I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).
>
> Regards
> Mridul
>
> [1] http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
>>
>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>
>> As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.
>>
>> To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
>>>
>>> Hi Ryan,
>>>
>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.
>>>
>>> It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!
>>>
>>>
>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
>>>>
>>>> Thanks, Reynold!
>>>>
>>>> I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.
>>>>
>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!
>>>>
>>>> rb
>>>>
>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
>>>>>
>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>
>>>>> Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.
>>>>>
>>>>> I like this proposed abstraction and have successfully prototyped it to make sure it works.
>>>>>
>>>>> During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
>>>>> add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.
>>>>>
>>>>> About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.
>>>>>
>>>>> Thanks,
>>>>> Wenchen
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
>>>>>>
>>>>>> I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.
>>>>>>
>>>>>> Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:
>>>>>>
>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.
>>>>>>
>>>>>> 1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.
>>>>>>
>>>>>> 2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.
>>>>>>
>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).
>>>>>>
>>>>>>
>>>>>> To illustrate with pseudocode what the different levels mean, a batch query would look like the following:
>>>>>>
>>>>>> val provider = reflection[Format]("parquet")
>>>>>> val table = provider.createTable(options)
>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
>>>>>> // run tasks on executors
>>>>>>
>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>
>>>>>> val provider = reflection[Format]("parquet")
>>>>>> val table = provider.createTable(options)
>>>>>> val stream = table.createStream(scanConfig)
>>>>>>
>>>>>> while(true) {
>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>   // run tasks on executors
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Vs the current API, the above:
>>>>>>
>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan abstraction.
>>>>>>
>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.
>>>>>>
>>>>>>
>>>>>>
>>>>>> This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix



--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Fwd: data source api v2 refactoring

Ryan Blue
In reply to this post by Mridul Muralidharan
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

cloud0fan
I'm switching to my another Gmail account, let's see if it still gets dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Ryan Blue
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Hyukjin Kwon
BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <[hidden email]>님이 작성:
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

cloud0fan
Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.


Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:
BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <[hidden email]>님이 작성:
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Ryan Blue
There are a few v2-related changes that we can work in parallel, at least for reviews:

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths
* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor
* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

rb


On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.


Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:
BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <[hidden email]>님이 작성:
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:
Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

Regards 
Mridul 


On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:
Thanks for clarifying, Wenchen. I think that's what I expected.

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

rb

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:
Hi Ryan,

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:
Thanks, Reynold!

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

rb

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:
Thank Reynold for writing this and starting the discussion!

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

I like this proposed abstraction and have successfully prototyped it to make sure it works.

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and
add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

Thanks,
Wenchen





On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:
I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.



This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.




--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Thakrar, Jayesh

Ryan et al,

 

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have been ported to V2.

I remember reading threads where there were discussions about the inefficiency/overhead of converting from Row to InternalRow that was preventing certain porting effort etc.

 

I ask because those are the most widely used data sources and have a lot of effort and thinking behind them, and if they have ported over to V2, then they can serve as excellent production examples of V2 API.

 

Thanks,

Jayesh

 

From: Ryan Blue <[hidden email]>
Reply-To: <[hidden email]>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan <[hidden email]>
Cc: Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

There are a few v2-related changes that we can work in parallel, at least for reviews:

 

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths

* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor

* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

 

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

 

rb

 

 

On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.

 

 

Hi Hyukjin,

 

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

 

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:

BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

 

2018 9 7 () 오전 12:52, Ryan Blue <[hidden email]>님이 작성:

Wenchen,

 

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

 

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

 

rb

 

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:

I'm switching to my another Gmail account, let's see if it still gets dropped this time.

 

Hi Ryan,

 

I'm thinking about the write path and feel the abstraction should be the same.

 

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

 

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like

trait Table {

  WriteConfig newAppendWriteConfig();

 

  WriteConfig newDeleteWriteConfig(deleteExprs);

 

  LogicalWrite newLogicalWrite(writeConfig);

}

 

Without WriteConfig, the API looks like

trait Table {

  LogicalWrite newAppendWrite();

 

  LogicalWrite newDeleteWrite(deleteExprs);

}

 

 

It looks to me that the API is simpler without WriteConfig, what do you think?

 

Thanks,

Wenchen

 

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:

Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>

 

Hi Mridul,

 

I'm not sure what's going on, my email was CC'ed to the dev list.

 

 

Hi Ryan,

 

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

 

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

 

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

 

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )

I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

 

Regards 

Mridul 

 

 

On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:

Thanks for clarifying, Wenchen. I think that's what I expected.

 

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

 

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

 

rb

 

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

 

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

 

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:

Thanks, Reynold!

 

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

 

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

 

rb

 

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:

Thank Reynold for writing this and starting the discussion!

 

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

 

I like this proposed abstraction and have successfully prototyped it to make sure it works.

 

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and

add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

 

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

 

Thanks,

Wenchen

 

 

 

 

 

On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:

I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

 

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

 

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

 

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

 

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

 

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).

 

 

To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

 

A streaming micro-batch scan would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val stream = table.createStream(scanConfig)

 

while(true) {

  val scan = streamingScan.createScan(startOffset)

  // run tasks on executors

}

 

 

Vs the current API, the above:

 

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

 

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.

 

 

 

This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.

 

 


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix

Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Ryan Blue
Hi Jayesh,

The existing sources haven't been ported to v2 yet. That is going to be tricky because the existing sources implement behaviors that we need to keep for now.

I wrote up an SPIP to standardize logical plans while moving to the v2 sources. The reason why we need this is that too much is delegated to sources today. For example, sources are handed a SaveMode to overwrite data, but what exactly gets overwritten isn't defined and it varies by the source that gets used. That's not a good thing and we want to clean up what happens so that users know that a query behaves the same way across all v2 sources. CTAS shouldn't succeed for one source but fail for another if the table already exists.

Standardizing plans makes it difficult to port the existing sources to v2 because we need to implement the behavior of the v2 plans, which may not be the existing v1 behavior. I think what we should do is keep the existing v1 sources working as they do today, and add a way to opt in for v2 behavior. One good way to do this is to use a new write API that is more clear; I proposed one in the SPIP I mentioned earlier. SQL is a bit easier because the behavior for SQL is fairly well-defined. The problem is mostly with the existing DF write API, DataFrameWriter.

It would be great to open a discussion about the compatibility between v1 and v2 and come up with a plan on this list.

rb

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh <[hidden email]> wrote:

Ryan et al,

 

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have been ported to V2.

I remember reading threads where there were discussions about the inefficiency/overhead of converting from Row to InternalRow that was preventing certain porting effort etc.

 

I ask because those are the most widely used data sources and have a lot of effort and thinking behind them, and if they have ported over to V2, then they can serve as excellent production examples of V2 API.

 

Thanks,

Jayesh

 

From: Ryan Blue <[hidden email]>
Reply-To: <[hidden email]>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan <[hidden email]>
Cc: Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

There are a few v2-related changes that we can work in parallel, at least for reviews:

 

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths

* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor

* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

 

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

 

rb

 

 

On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.

 

 

Hi Hyukjin,

 

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

 

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:

BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

 

2018 9 7 () 오전 12:52, Ryan Blue <[hidden email]>님이 작성:

Wenchen,

 

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

 

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

 

rb

 

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:

I'm switching to my another Gmail account, let's see if it still gets dropped this time.

 

Hi Ryan,

 

I'm thinking about the write path and feel the abstraction should be the same.

 

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

 

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like

trait Table {

  WriteConfig newAppendWriteConfig();

 

  WriteConfig newDeleteWriteConfig(deleteExprs);

 

  LogicalWrite newLogicalWrite(writeConfig);

}

 

Without WriteConfig, the API looks like

trait Table {

  LogicalWrite newAppendWrite();

 

  LogicalWrite newDeleteWrite(deleteExprs);

}

 

 

It looks to me that the API is simpler without WriteConfig, what do you think?

 

Thanks,

Wenchen

 

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:

Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>

 

Hi Mridul,

 

I'm not sure what's going on, my email was CC'ed to the dev list.

 

 

Hi Ryan,

 

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

 

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

 

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

 

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )

I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

 

Regards 

Mridul 

 

 

On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:

Thanks for clarifying, Wenchen. I think that's what I expected.

 

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

 

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

 

rb

 

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

 

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

 

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:

Thanks, Reynold!

 

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

 

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

 

rb

 

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:

Thank Reynold for writing this and starting the discussion!

 

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

 

I like this proposed abstraction and have successfully prototyped it to make sure it works.

 

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and

add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

 

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

 

Thanks,

Wenchen

 

 

 

 

 

On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:

I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

 

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

 

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

 

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

 

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

 

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).

 

 

To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

 

A streaming micro-batch scan would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val stream = table.createStream(scanConfig)

 

while(true) {

  val scan = streamingScan.createScan(startOffset)

  // run tasks on executors

}

 

 

Vs the current API, the above:

 

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

 

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.

 

 

 

This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.

 

 


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

Thakrar, Jayesh

Thanks for the info Ryan – very helpful!

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Wednesday, September 19, 2018 at 3:17 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: Wenchen Fan <[hidden email]>, Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

Hi Jayesh,

 

The existing sources haven't been ported to v2 yet. That is going to be tricky because the existing sources implement behaviors that we need to keep for now.

 

I wrote up an SPIP to standardize logical plans while moving to the v2 sources. The reason why we need this is that too much is delegated to sources today. For example, sources are handed a SaveMode to overwrite data, but what exactly gets overwritten isn't defined and it varies by the source that gets used. That's not a good thing and we want to clean up what happens so that users know that a query behaves the same way across all v2 sources. CTAS shouldn't succeed for one source but fail for another if the table already exists.

 

Standardizing plans makes it difficult to port the existing sources to v2 because we need to implement the behavior of the v2 plans, which may not be the existing v1 behavior. I think what we should do is keep the existing v1 sources working as they do today, and add a way to opt in for v2 behavior. One good way to do this is to use a new write API that is more clear; I proposed one in the SPIP I mentioned earlier. SQL is a bit easier because the behavior for SQL is fairly well-defined. The problem is mostly with the existing DF write API, DataFrameWriter.

 

It would be great to open a discussion about the compatibility between v1 and v2 and come up with a plan on this list.

 

rb

 

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh <[hidden email]> wrote:

Ryan et al,

 

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have been ported to V2.

I remember reading threads where there were discussions about the inefficiency/overhead of converting from Row to InternalRow that was preventing certain porting effort etc.

 

I ask because those are the most widely used data sources and have a lot of effort and thinking behind them, and if they have ported over to V2, then they can serve as excellent production examples of V2 API.

 

Thanks,

Jayesh

 

From: Ryan Blue <[hidden email]>
Reply-To: <[hidden email]>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan <[hidden email]>
Cc: Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

There are a few v2-related changes that we can work in parallel, at least for reviews:

 

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths

* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor

* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

 

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

 

rb

 

 

On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.

 

 

Hi Hyukjin,

 

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

 

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:

BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

 

2018 9 7 () 오전 12:52, Ryan Blue <[hidden email]>님이 작성:

Wenchen,

 

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

 

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

 

rb

 

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:

I'm switching to my another Gmail account, let's see if it still gets dropped this time.

 

Hi Ryan,

 

I'm thinking about the write path and feel the abstraction should be the same.

 

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

 

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like

trait Table {

  WriteConfig newAppendWriteConfig();

 

  WriteConfig newDeleteWriteConfig(deleteExprs);

 

  LogicalWrite newLogicalWrite(writeConfig);

}

 

Without WriteConfig, the API looks like

trait Table {

  LogicalWrite newAppendWrite();

 

  LogicalWrite newDeleteWrite(deleteExprs);

}

 

 

It looks to me that the API is simpler without WriteConfig, what do you think?

 

Thanks,

Wenchen

 

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:

Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>

 

Hi Mridul,

 

I'm not sure what's going on, my email was CC'ed to the dev list.

 

 

Hi Ryan,

 

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

 

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

 

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

 

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )

I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

 

Regards 

Mridul 

 

 

On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:

Thanks for clarifying, Wenchen. I think that's what I expected.

 

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

 

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

 

rb

 

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

 

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

 

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:

Thanks, Reynold!

 

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

 

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

 

rb

 

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:

Thank Reynold for writing this and starting the discussion!

 

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

 

I like this proposed abstraction and have successfully prototyped it to make sure it works.

 

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and

add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

 

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

 

Thanks,

Wenchen

 

 

 

 

 

On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:

I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

 

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

 

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

 

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

 

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

 

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).

 

 

To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

 

A streaming micro-batch scan would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val stream = table.createStream(scanConfig)

 

while(true) {

  val scan = streamingScan.createScan(startOffset)

  // run tasks on executors

}

 

 

Vs the current API, the above:

 

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

 

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.

 

 

 

This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.

 

 


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix

Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

cloud0fan
I want to bring back the discussion of data source v2 abstraction.

There is a problem discovered by Hyukjin recently. For a write-only data source, it may accept any input, and itself does not have a schema. Then the table abstraction doesn't fit it, as table must provide a schema.

Personally I think this is a corner case. Who will develop a data source like that? If users do have this requirement, maybe they can just implement a table with empty schema, and in Spark the append operator skips input schema validation if table schema is empty.

Any thoughts?

Thanks,
Wenchen

On Thu, Sep 20, 2018 at 4:51 AM Thakrar, Jayesh <[hidden email]> wrote:

Thanks for the info Ryan – very helpful!

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Wednesday, September 19, 2018 at 3:17 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: Wenchen Fan <[hidden email]>, Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

Hi Jayesh,

 

The existing sources haven't been ported to v2 yet. That is going to be tricky because the existing sources implement behaviors that we need to keep for now.

 

I wrote up an SPIP to standardize logical plans while moving to the v2 sources. The reason why we need this is that too much is delegated to sources today. For example, sources are handed a SaveMode to overwrite data, but what exactly gets overwritten isn't defined and it varies by the source that gets used. That's not a good thing and we want to clean up what happens so that users know that a query behaves the same way across all v2 sources. CTAS shouldn't succeed for one source but fail for another if the table already exists.

 

Standardizing plans makes it difficult to port the existing sources to v2 because we need to implement the behavior of the v2 plans, which may not be the existing v1 behavior. I think what we should do is keep the existing v1 sources working as they do today, and add a way to opt in for v2 behavior. One good way to do this is to use a new write API that is more clear; I proposed one in the SPIP I mentioned earlier. SQL is a bit easier because the behavior for SQL is fairly well-defined. The problem is mostly with the existing DF write API, DataFrameWriter.

 

It would be great to open a discussion about the compatibility between v1 and v2 and come up with a plan on this list.

 

rb

 

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh <[hidden email]> wrote:

Ryan et al,

 

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have been ported to V2.

I remember reading threads where there were discussions about the inefficiency/overhead of converting from Row to InternalRow that was preventing certain porting effort etc.

 

I ask because those are the most widely used data sources and have a lot of effort and thinking behind them, and if they have ported over to V2, then they can serve as excellent production examples of V2 API.

 

Thanks,

Jayesh

 

From: Ryan Blue <[hidden email]>
Reply-To: <[hidden email]>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan <[hidden email]>
Cc: Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

There are a few v2-related changes that we can work in parallel, at least for reviews:

 

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths

* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor

* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

 

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

 

rb

 

 

On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.

 

 

Hi Hyukjin,

 

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

 

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:

BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

 

2018 9 7 () 오전 12:52, Ryan Blue <[hidden email]>님이 작성:

Wenchen,

 

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

 

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

 

rb

 

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:

I'm switching to my another Gmail account, let's see if it still gets dropped this time.

 

Hi Ryan,

 

I'm thinking about the write path and feel the abstraction should be the same.

 

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

 

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like

trait Table {

  WriteConfig newAppendWriteConfig();

 

  WriteConfig newDeleteWriteConfig(deleteExprs);

 

  LogicalWrite newLogicalWrite(writeConfig);

}

 

Without WriteConfig, the API looks like

trait Table {

  LogicalWrite newAppendWrite();

 

  LogicalWrite newDeleteWrite(deleteExprs);

}

 

 

It looks to me that the API is simpler without WriteConfig, what do you think?

 

Thanks,

Wenchen

 

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:

Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>

 

Hi Mridul,

 

I'm not sure what's going on, my email was CC'ed to the dev list.

 

 

Hi Ryan,

 

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

 

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

 

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

 

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )

I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

 

Regards 

Mridul 

 

 

On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:

Thanks for clarifying, Wenchen. I think that's what I expected.

 

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

 

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

 

rb

 

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

 

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

 

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:

Thanks, Reynold!

 

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

 

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

 

rb

 

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:

Thank Reynold for writing this and starting the discussion!

 

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

 

I like this proposed abstraction and have successfully prototyped it to make sure it works.

 

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and

add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

 

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

 

Thanks,

Wenchen

 

 

 

 

 

On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:

I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

 

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

 

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

 

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

 

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

 

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).

 

 

To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

 

A streaming micro-batch scan would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val stream = table.createStream(scanConfig)

 

while(true) {

  val scan = streamingScan.createScan(startOffset)

  // run tasks on executors

}

 

 

Vs the current API, the above:

 

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

 

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.

 

 

 

This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.

 

 


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix

Reply | Threaded
Open this post in threaded view
|

RE: data source api v2 refactoring

assaf.mendelson

HI,

 

I actually encountered this corner case and I think it is not that uncommon.

 

In my case, I was writing a write only source which used some library to write to a database. I didn’t want to have to write a reader, however, even if I would have written one it wouldn’t have worked. I wouldn’t have been able to “read” using the correct schema.

 

The issue occurs because the assumption is that the schema we get from “read” is the same as the schema for “write” and this is incorrect.

 

Consider for example a data source converts everything to a json string. When reading we might have a single string column, however, we can write any dataframe to this source.

 

I am not sure how it affects the greater discussion, however. For this specific case we could simply provide the ability to customize the validation (or better yet, read trying to enforce the schema from the dataframe).

 

 

Thanks,

        Assaf

 

From: Wenchen Fan [mailto:[hidden email]]
Sent: Thursday, October 18, 2018 5:26 PM
To: Reynold?Xin
Cc: Ryan Blue; Hyukjin Kwon; Spark dev list
Subject: Re: data source api v2 refactoring

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

I want to bring back the discussion of data source v2 abstraction.

 

There is a problem discovered by Hyukjin recently. For a write-only data source, it may accept any input, and itself does not have a schema. Then the table abstraction doesn't fit it, as table must provide a schema.

 

Personally I think this is a corner case. Who will develop a data source like that? If users do have this requirement, maybe they can just implement a table with empty schema, and in Spark the append operator skips input schema validation if table schema is empty.

 

Any thoughts?

 

Thanks,

Wenchen

 

On Thu, Sep 20, 2018 at 4:51 AM Thakrar, Jayesh <[hidden email]> wrote:

Thanks for the info Ryan – very helpful!

 

From: Ryan Blue <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Wednesday, September 19, 2018 at 3:17 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: Wenchen Fan <[hidden email]>, Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

Hi Jayesh,

 

The existing sources haven't been ported to v2 yet. That is going to be tricky because the existing sources implement behaviors that we need to keep for now.

 

I wrote up an SPIP to standardize logical plans while moving to the v2 sources. The reason why we need this is that too much is delegated to sources today. For example, sources are handed a SaveMode to overwrite data, but what exactly gets overwritten isn't defined and it varies by the source that gets used. That's not a good thing and we want to clean up what happens so that users know that a query behaves the same way across all v2 sources. CTAS shouldn't succeed for one source but fail for another if the table already exists.

 

Standardizing plans makes it difficult to port the existing sources to v2 because we need to implement the behavior of the v2 plans, which may not be the existing v1 behavior. I think what we should do is keep the existing v1 sources working as they do today, and add a way to opt in for v2 behavior. One good way to do this is to use a new write API that is more clear; I proposed one in the SPIP I mentioned earlier. SQL is a bit easier because the behavior for SQL is fairly well-defined. The problem is mostly with the existing DF write API, DataFrameWriter.

 

It would be great to open a discussion about the compatibility between v1 and v2 and come up with a plan on this list.

 

rb

 

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh <[hidden email]> wrote:

Ryan et al,

 

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have been ported to V2.

I remember reading threads where there were discussions about the inefficiency/overhead of converting from Row to InternalRow that was preventing certain porting effort etc.

 

I ask because those are the most widely used data sources and have a lot of effort and thinking behind them, and if they have ported over to V2, then they can serve as excellent production examples of V2 API.

 

Thanks,

Jayesh

 

From: Ryan Blue <[hidden email]>
Reply-To: <[hidden email]>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan <[hidden email]>
Cc: Hyukjin Kwon <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: data source api v2 refactoring

 

There are a few v2-related changes that we can work in parallel, at least for reviews:

 

* SPARK-25006, #21978: Add catalog to TableIdentifier - this proposes how to incrementally add multi-catalog support without breaking existing code paths

* SPARK-24253, #21308: Add DeleteSupport API - this is a small API addition, which doesn't affect the refactor

* SPARK-24252, #21306: Add v2 Catalog API - this is a different way to create v2 tables, also doesn't affect the refactor

 

I agree that the PR for adding SQL support should probably wait on the refactor. I have also been meaning to share our implementation, which isn't based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and AlterTable from both SQL and the other methods in the DF API, saveAsTable and insertInto. It follows the structure that I proposed on the SQL support PR to convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS and RTAS.

 

rb

 

 

On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

You are right that the `LogicalWrite` mirrors the read side API. I just don't have a good naming yet, and write side changes will be a different PR.

 

 

Hi Hyukjin,

 

That's my expectation, otherwise we keep rebasing the refactor PR and never get it done.

 

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <[hidden email]> wrote:

BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?

 

2018 9 7 () 오전 12:52, Ryan Blue <[hidden email]>님이 작성:

Wenchen,

 

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?

 

I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly.

 

rb

 

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <[hidden email]> wrote:

I'm switching to my another Gmail account, let's see if it still gets dropped this time.

 

Hi Ryan,

 

I'm thinking about the write path and feel the abstraction should be the same.

 

We still have logical and physical writing. And the table can create different logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

 

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API would look like

trait Table {

  WriteConfig newAppendWriteConfig();

 

  WriteConfig newDeleteWriteConfig(deleteExprs);

 

  LogicalWrite newLogicalWrite(writeConfig);

}

 

Without WriteConfig, the API looks like

trait Table {

  LogicalWrite newAppendWrite();

 

  LogicalWrite newDeleteWrite(deleteExprs);

}

 

 

It looks to me that the API is simpler without WriteConfig, what do you think?

 

Thanks,

Wenchen

 

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <[hidden email]> wrote:

Latest from Wenchen in case it was dropped.

---------- Forwarded message ---------
From: Wenchen Fan <[hidden email]>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: <[hidden email]>
Cc: Ryan Blue <[hidden email]>, Reynold Xin <[hidden email]>, <[hidden email]>

 

Hi Mridul,

 

I'm not sure what's going on, my email was CC'ed to the dev list.

 

 

Hi Ryan,

 

The logical and physical scan idea sounds good. To add more color to Jungtaek's question, both micro-batch and continuous mode have the logical and physical scan, but there is a difference: for micro-batch mode, a physical scan outputs data for one epoch, but it's not true for continuous mode.

 

I'm not sure if it's necessary to include streaming epoch in the API abstraction, for features like metrics reporting.

 

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <[hidden email]> wrote:

 

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) )

I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).

 

Regards 

Mridul 

 

 

On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <[hidden email]> wrote:

Thanks for clarifying, Wenchen. I think that's what I expected.

 

As for the abstraction, here's the way that I think about it: there are two important parts of a scan: the definition of what will be read, and task sets that actually perform the read. In batch, there's one definition of the scan and one task set so it makes sense that there's one scan object that encapsulates both of these concepts. For streaming, we need to separate the two into the definition of what will be read (the stream or streaming read) and the task sets that are run (scans). That way, the streaming read behaves like a factory for scans, producing scans that handle the data either in micro-batches or using continuous tasks.

 

To address Jungtaek's question, I think that this does work with continuous. In continuous mode, the query operators keep running and send data to one another directly. The API still needs a streaming read layer because it may still produce more than one continuous scan. That would happen when the underlying source changes and Spark needs to reconfigure. I think the example here is when partitioning in a Kafka topic changes and Spark needs to re-map Kafka partitions to continuous tasks.

 

rb

 

On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <[hidden email]> wrote:

Hi Ryan,

 

Sorry I may use a wrong wording. The pushdown is done with ScanConfig, which is not table/stream/scan, but something between them. The table creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. For streaming source, stream is the one to take care of the pushdown result. For batch source, it's the scan.

 

It's a little tricky because stream is an abstraction for streaming source only. Better ideas are welcome!

 

On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <[hidden email]> wrote:

Thanks, Reynold!

 

I think your API sketch looks great. I appreciate having the Table level in the abstraction to plug into as well. I think this makes it clear what everything does, particularly having the Stream level that represents a configured (by ScanConfig) streaming read and can act as a factory for individual batch scans or for continuous scans.

 

Wenchen, I'm not sure what you mean by doing pushdown at the table level. It seems to mean that pushdown is specific to a batch scan or streaming read, which seems to be what you're saying as well. Wouldn't the pushdown happen to create a ScanConfig, which is then used as Reynold suggests? Looking forward to seeing this PR when you get it posted. Thanks for all of your work on this!

 

rb

 

On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <[hidden email]> wrote:

Thank Reynold for writing this and starting the discussion!

 

Data source v2 was started with batch only, so we didn't pay much attention to the abstraction and just follow the v1 API. Now we are designing the streaming API and catalog integration, the abstraction becomes super important.

 

I like this proposed abstraction and have successfully prototyped it to make sure it works.

 

During prototyping, I have to work around the issue that the current streaming engine does query optimization/planning for each micro batch. With this abstraction, the operator pushdown is only applied once per-query. In my prototype, I do the physical planning up front to get the pushdown result, and

add a logical linking node that wraps the resulting physical plan node for the data source, and then swap that logical linking node into the logical plan for each batch. In the future we should just let the streaming engine do query optimization/planning only once.

 

About pushdown, I think we should do it at the table level. The table should create a new pushdow handler to apply operator pushdowm for each scan/stream, and create the scan/stream with the pushdown result. The rationale is, a table should have the same pushdown behavior regardless the scan node.

 

Thanks,

Wenchen

 

 

 

 

 

On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <[hidden email]> wrote:

I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.

 

Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api:

 

0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level.

 

1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design.

 

2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code.

 

3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig).

 

 

To illustrate with pseudocode what the different levels mean, a batch query would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options
// run tasks on executors

 

A streaming micro-batch scan would look like the following:

 

val provider = reflection[Format]("parquet")

val table = provider.createTable(options)

val stream = table.createStream(scanConfig)

 

while(true) {

  val scan = streamingScan.createScan(startOffset)

  // run tasks on executors

}

 

 

Vs the current API, the above:

 

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

 

2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created.

 

 

 

This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.

 

 


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix


 

--

Ryan Blue

Software Engineer

Netflix

Reply | Threaded
Open this post in threaded view
|

Re: data source api v2 refactoring

JackyLee
In reply to this post by Ryan Blue
I have pushed a patch for SQLStreaming, which just resolved the problem just
discussed.
the Jira:
    https://issues.apache.org/jira/browse/SPARK-24630
the Patch:
    https://github.com/apache/spark/pull/22575

SQLStreaming just defined the table API for StructStreaming, and the Table
APIs for Streaming and batch are are fully compatible.

With SQLStreaming, we can create a streaming just like this:
val table = spark.createTable()
spark.table(temp)



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]