[VOTE] [SPIP] SPARK-15689: Data Source API V2

classic Classic list List threaded Threaded
36 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[VOTE] [SPIP] SPARK-15689: Data Source API V2

cloud0fan
Hi all,

It has been almost 2 weeks since I proposed the data source V2 for discussion, and we already got some feedbacks on the JIRA ticket and the prototype PR, so I'd like to call for a vote.

The full document of the Data Source API V2 is:

Note that, this vote should focus on high-level design/framework, not specified APIs, as we can always change/improve specified APIs during development.

The vote will be up for the next 72 hours. Please reply with your vote:

+1: Yeah, let's go forward and implement the SPIP.
+0: Don't really care.
-1: I don't think this is a good idea because of the following technical reasons.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

RussS
+1 (Non-binding) 

The clustering approach covers most of my requirements on saving some shuffles. We kind of left the "should the user be allowed to provide a full partitioner" discussion on the table. I understand that would require exposing a lot of internals so this is perhaps a good compromise.

On Mon, Aug 28, 2017 at 12:20 PM Wenchen Fan <[hidden email]> wrote:
Hi all,

It has been almost 2 weeks since I proposed the data source V2 for discussion, and we already got some feedbacks on the JIRA ticket and the prototype PR, so I'd like to call for a vote.

The full document of the Data Source API V2 is:

Note that, this vote should focus on high-level design/framework, not specified APIs, as we can always change/improve specified APIs during development.

The vote will be up for the next 72 hours. Please reply with your vote:

+1: Yeah, let's go forward and implement the SPIP.
+0: Don't really care.
-1: I don't think this is a good idea because of the following technical reasons.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

Cody Koeninger-2
In reply to this post by cloud0fan
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:

> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

Xiao Li
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

Jiang Xingbo
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
In reply to this post by Jiang Xingbo
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
In reply to this post by Jiang Xingbo
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

rxin
In reply to this post by James Baker
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
In reply to this post by rxin
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

cloud0fan
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
In reply to this post by cloud0fan
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

James Baker
In reply to this post by cloud0fan
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

RE: [VOTE] [SPIP] SPARK-15689: Data Source API V2

assaf.mendelson
This post has NOT been accepted by the mailing list yet.

Why not have two interfaces and allow implementing whichever?

After all, we are already using several trait and the data source implementer decides which to use. We can have two ways to implement the pushdown, one which pushes down all interfaces and one which pushes down one at a time.

Most people would probably not implement pushing filters at all (the simplest ones). From those who would, they would probably use the basic one with the current suggestion.

Lastly there are very few people who would need the complex full plan interface. That said, most of those would probably be providers of complex data sources which we want to support as efficiently as possible. For those developers we could add an experimental interface which provides the full plan. Since these would need to be more knowledgeable developers  anyway (as they would need understanding of the spark side in order to decide what is efficient and was is not efficient to push down), we can use a much lighter solution (e.g. give the current, internal plan) and mark is as experimental or even developer.

 

Sure, managing two interfaces kinda sucks but having it as an available internal interface wouldn’t be much of a commitment.

 

 

Thanks,

        Assaf

 

From: James Baker [via Apache Spark Developers List] [mailto:ml+[hidden email]]
Sent: Wednesday, August 30, 2017 5:54 AM
To: Mendelson, Assaf <[hidden email]>
Subject: Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

 

I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

 

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

 

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

 

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

 

The current design is friendly to simple datasources, but does not have the potential to support this.

 

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

 

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

 

James

 

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:

Hi James,

 

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

 

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.

I don't have a solution to solve these 2 problems, comments are welcome.

 

 

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

 

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.

Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.

 

 

The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?

 

 

On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:

Yeah, for sure.

 

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

 

That said, what would also be fine for us is a place to plug into an unstable query plan.

 

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

 

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

 

To your second question:

 

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

 

For example, the dataset

 

a b c

1 2 3

1 3 3

1 3 4

2 1 1

2 0 1

 

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

 

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

 

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

 

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

 

public final class CachingFoo implements Foo {

    private final Foo delegate;

 

    private List<Filter> currentFilters = emptyList();

    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

 

    public CachingFoo(Foo delegate) {

        this.delegate = delegate;

    }

 

    private Supplier<Bar> newSupplier(List<Filter> filters) {

        return Suppliers.memoize(() -> delegate.computeBar(filters));

    }

 

    @Override

    public Bar computeBar(List<Filter> filters) {

        if (!filters.equals(currentFilters)) {

            currentFilters = filters;

            barSupplier = newSupplier(filters);

        }

 

        return barSupplier.get();

    }

}

 

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

 

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

 

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

 

James

 

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:

James,

 

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

 

The downside to that (full query plan push down) are:

 

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

 

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.

 


Re: your point about the proposed v2 being worse than v1 for your use case.

 

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.

 

 

On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:

Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

 

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James

 

On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:

+1 (Non-binding) 

 

Xiao Li <[hidden email]>2017828 周一下午5:38写道:

+1

 

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:

Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html


On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

 

 

 

 


If you reply to this email, your message will be added to the discussion below:

http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-SPIP-SPARK-15689-Data-Source-API-V2-tp22231p22282.html

To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

cloud0fan
In reply to this post by James Baker
OK I agree with it, how about we add a new interface to push down the query plan, based on the current framework? We can mark the query-plan-push-down interface as unstable, to save the effort of designing a stable representation of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker <[hidden email]> wrote:
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

rxin
That might be good to do, but seems like orthogonal to this effort itself. It would be a completely different interface. 

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <[hidden email]> wrote:
OK I agree with it, how about we add a new interface to push down the query plan, based on the current framework? We can mark the query-plan-push-down interface as unstable, to save the effort of designing a stable representation of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker <[hidden email]> wrote:
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

Ryan Blue

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so thanks for starting this one… but there’s still discussion happening on the prototype API, which it hasn’t been updated. I’d like to see the proposal shaped by the ongoing discussion so that we have a better, more concrete plan. I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals should be separated. The PR currently has “write path” listed as a TODO item and most of the discussion I’ve seen is on the read side. I think it would be better to separate the read and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider ways to fix that problem instead of carrying the problem forward to Data Source V2. We can solve this by adding a high-level API for DDL and a better write/insert API that works well with it. Clearly, that discussion is independent of the read path, which is why I think separating the two proposals would be a win.

rb


On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin <[hidden email]> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It would be a completely different interface. 

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <[hidden email]> wrote:
OK I agree with it, how about we add a new interface to push down the query plan, based on the current framework? We can mark the query-plan-push-down interface as unstable, to save the effort of designing a stable representation of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker <[hidden email]> wrote:
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]








--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

rxin
So we seem to be getting into a cycle of discussing more about the details of APIs than the high level proposal. The details of APIs are important to debate, but those belong more in code reviews.

One other important thing is that we should avoid API design by committee. While it is extremely useful to get feedback, understand the use cases, we cannot do API design by incorporating verbatim the union of everybody's feedback. API design is largely a tradeoff game. The most expressive API would also be harder to use, or sacrifice backward/forward compatibility. It is as important to decide what to exclude as what to include.

Unlike the v1 API, the way Wenchen's high level V2 framework is proposed makes it very easy to add new features (e.g. clustering properties) in the future without breaking any APIs. I'd rather us shipping something useful that might not be the most comprehensive set, than debating about every single feature we should add and then creating something super complicated that has unclear value.



On Wed, Aug 30, 2017 at 6:37 PM, Ryan Blue <[hidden email]> wrote:

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so thanks for starting this one… but there’s still discussion happening on the prototype API, which it hasn’t been updated. I’d like to see the proposal shaped by the ongoing discussion so that we have a better, more concrete plan. I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals should be separated. The PR currently has “write path” listed as a TODO item and most of the discussion I’ve seen is on the read side. I think it would be better to separate the read and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider ways to fix that problem instead of carrying the problem forward to Data Source V2. We can solve this by adding a high-level API for DDL and a better write/insert API that works well with it. Clearly, that discussion is independent of the read path, which is why I think separating the two proposals would be a win.

rb


On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin <[hidden email]> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It would be a completely different interface. 

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <[hidden email]> wrote:
OK I agree with it, how about we add a new interface to push down the query plan, based on the current framework? We can mark the query-plan-push-down interface as unstable, to save the effort of designing a stable representation of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker <[hidden email]> wrote:
I'll just focus on the one-by-one thing for now - it's the thing that blocks me the most.

I think the place where we're most confused here is on the cost of determining whether I can push down a filter. For me, in order to work out whether I can push down a filter or satisfy a sort, I might have to read plenty of data. That said, it's worth me doing this because I can use this information to avoid reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is common). So, it's likely not going to work to pass us all the combinations. That said, if I can tell you a cost, I know what optimal looks like, why can't I just pick that myself?

The current design is friendly to simple datasources, but does not have the potential to support this.

So the main problem we have with datasources v1 is that it's essentially impossible to leverage a bunch of Spark features - I don't get to use bucketing or row batches or all the nice things that I really want to use to get decent performance. Provided I can leverage these in a moderately supported way which won't break in any given commit, I'll be pretty happy with anything that lets me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated use cases, you end up being able to write simple mode in terms of it very easily. So we could actually provide two APIs, one that lets people who have more interesting datasources leverage the cool Spark features, and one that lets people who just want to implement basic features do that - I'd try to include some kind of layering here. I could probably sketch out something here if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <[hidden email]> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to make a tradeoff here.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments 

The problem with this approach is: 1) if we wanna add more arguments in the future, it's really hard to do without changing the existing interface. 2) if a user wants to implement a very simple data source, he has to look at all the arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, incrementally. If a data source wanna go back to accept a sort push down after it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do have a fully specified set of arguments waiting to be pushed down, but Spark doesn't know which is the best order to push them into data source. Spark can try every combination and ask the data source to report a cost, then Spark can pick the best combination with the lowest cost. This can also be implemented as a cost report interface, so that advanced data source can implement it for optimal performance, and simple data source doesn't need to care about it and keep simple.


The current design is very friendly to simple data source, and has the potential to support complex data source, I prefer the current design over the plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker <[hidden email]> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable, it restricts the modifications that you can do in the future too much. That said, it shouldn't be as hard if you restrict yourself to the parts of the plan which are supported by the datasources V2 API (which after all, need to be translateable properly into the future to support the mixins proposed). This should have a pretty small scope in comparison. As long as the user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified set of arguments (of which I can choose to ignore some), and return the information as to which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO is intuitive to the user. It also provides a convenient location to plug in things like stats. Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up having to reject pushdowns and filters that you actually can do, which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise will force a sort in Spark. On the PR I detail a case I see where I can push down two equality filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort before a filter is applied, but not afterwards. However, maybe the filter is so selective that it's better to push down the filter and not handle the sort. I don't get to make this decision, Spark does (but doesn't have good enough information to do it properly, whilst I do). I want to be able to choose the parts I push down given knowledge of my datasource - as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The retrieval methods end up being trickier to implement than is necessary because frequently a single operation provides the result of many of the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as the work I need to do to buildScan, so I want to cache it. This means that I end up with code that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;
    }

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));
    }

    @Override
    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);
        }

        return barSupplier.get();
    }
}

which caches the result required in unhandledFilters on the expectation that Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one example here, the state I will need in order to compute accurate column stats internally will likely be a subset of the work required in order to get the read tasks, tell you if I can handle filters, etc, so I'll want to cache them for reuse. However, the cached information needs to be appropriately invalidated when I add a new filter or sort order or limit, and this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods on your datasource (ideally this contract could be implied by the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin <[hidden email]> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the ability to expose full query plans, and let the data source decide which part of the query plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan. It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being able to do that in the past. The design of that API itself, to make sure we have a good story for backward and forward compatibility, would probably take months if not years. It might still be good to do, or offer an experimental trait without compatibility guarantee that uses the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown. Having to understand query plans is a burden rather than a gift.


Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown and as a result it is harder to implement. That's how it is supposed to be. If a data source simply implements one of the trait, it'd be logically identical to v1. I don't see why it would be worse or better, other than v2 provides much stronger forward compatibility guarantees than v1.


On Tue, Aug 29, 2017 at 4:54 AM, James Baker <[hidden email]> wrote:
Copying from the code review comments I just submitted on the draft API (https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

James


On Mon, 28 Aug 2017 at 18:00 蒋星博 <[hidden email]> wrote:
+1 (Non-binding) 

Xiao Li <[hidden email]>于2017年8月28日 周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger <[hidden email]>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <[hidden email]> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]








--
Ryan Blue
Software Engineer
Netflix

12