Pushdown in DataSourceV2 question

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Pushdown in DataSourceV2 question

Noritaka Sekiyama
Hi,

I'm a support engineer, interested in DataSourceV2.

Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
I noticed that DataFrame's explain() method shows pushdown even for JSON.
It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.

# Example
val df = spark.read.json("s3://sample_bucket/people.json")
df.printSchema()
df.filter($"age" > 20).explain()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

== Physical Plan ==
*Project [age#47L, name#48]
+- *Filter (isnotnull(age#47L) && (age#47L > 20))
   +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>

# Comments
As you can see, PushedFilter is shown even if input data is JSON.
Actually this pushdown is not used.
   
I'm wondering if it has been already discussed or not.
If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.


Warm regards,

Noritaka Sekiyama

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Jörn Franke
It was already available before DataSourceV2, but I think it might have been an internal/semi-official API (eg json is an internal datasource since some time now). The filters were provided to the datasource, but you will never know if the datasource has indeed leveraged them or if for other reasons (eg it would be inefficient in specific cases) decided to ignore the filters.

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>    
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Jörn Franke
In reply to this post by Noritaka Sekiyama
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>    
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Alessandro Solimando
Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Jörn Franke
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

cloud0fan
expressions/functions can be expensive and I do think Spark should trust data source and not re-apply pushed filters. If data source lies, many things can go wrong...

On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <[hidden email]> wrote:
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Jörn Franke
It is not about lying or not or trust or not. Some or all filters may not be supported by a data source. Some might only be applied under certain environmental conditions (eg enough memory etc). 

It is much more expensive to communicate between Spark and a data source which filters have been applied or not than just checking it as Spark does. Especially if you have several different data sources at the same time (joins etc).

Am 09.12.2018 um 14:30 schrieb Wenchen Fan <[hidden email]>:

expressions/functions can be expensive and I do think Spark should trust data source and not re-apply pushed filters. If data source lies, many things can go wrong...

On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <[hidden email]> wrote:
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Alessandro Solimando
I think you are generally right, but there are so many different scenarios that it might not always be the best option, consider for instance a "fast" network in between a single data source and "Spark", lots of data, an "expensive" (with low selectivity) expression as Wenchen suggested.

In such a case it looks to me that you end up "re-scanning" the whole dataset just to make sure the filter has been applied, where having such an info as metadata or via a communication protocol with the data source (if supported) would be cheaper.

If there is no support at all for such a mechanism I think it could be worth exploring a bit more the idea. However, supporting such a mechanism would require some developing effort for each datasource to support (e.g., asking the datasource for the physical plan applied at query time, the ability to parse it to extract relevant info and act on them), as I am not aware of any general interface for exchanging such information.



On Sun, 9 Dec 2018 at 15:34, Jörn Franke <[hidden email]> wrote:
It is not about lying or not or trust or not. Some or all filters may not be supported by a data source. Some might only be applied under certain environmental conditions (eg enough memory etc). 

It is much more expensive to communicate between Spark and a data source which filters have been applied or not than just checking it as Spark does. Especially if you have several different data sources at the same time (joins etc).

Am 09.12.2018 um 14:30 schrieb Wenchen Fan <[hidden email]>:

expressions/functions can be expensive and I do think Spark should trust data source and not re-apply pushed filters. If data source lies, many things can go wrong...

On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <[hidden email]> wrote:
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Noritaka Sekiyama
Hi, 
Thank you for responding to this thread. I'm really interested in this discussion.

My original idea might be the same as what Alessandro said, introducing a mechanism that Spark can communicate with DataSource and get metadata which shows if pushdown is supported or not.
I'm wondering if it will be such expensive or not..

 


2018年12月10日(月) 20:12 Alessandro Solimando <[hidden email]>:
I think you are generally right, but there are so many different scenarios that it might not always be the best option, consider for instance a "fast" network in between a single data source and "Spark", lots of data, an "expensive" (with low selectivity) expression as Wenchen suggested.

In such a case it looks to me that you end up "re-scanning" the whole dataset just to make sure the filter has been applied, where having such an info as metadata or via a communication protocol with the data source (if supported) would be cheaper.

If there is no support at all for such a mechanism I think it could be worth exploring a bit more the idea. However, supporting such a mechanism would require some developing effort for each datasource to support (e.g., asking the datasource for the physical plan applied at query time, the ability to parse it to extract relevant info and act on them), as I am not aware of any general interface for exchanging such information.



On Sun, 9 Dec 2018 at 15:34, Jörn Franke <[hidden email]> wrote:
It is not about lying or not or trust or not. Some or all filters may not be supported by a data source. Some might only be applied under certain environmental conditions (eg enough memory etc). 

It is much more expensive to communicate between Spark and a data source which filters have been applied or not than just checking it as Spark does. Especially if you have several different data sources at the same time (joins etc).

Am 09.12.2018 um 14:30 schrieb Wenchen Fan <[hidden email]>:

expressions/functions can be expensive and I do think Spark should trust data source and not re-apply pushed filters. If data source lies, many things can go wrong...

On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <[hidden email]> wrote:
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pushdown in DataSourceV2 question

Ryan Blue
In v2, it is up to the data source to tell Spark that a pushed filter is satisfied, by returning the pushed filters that Spark should run. You can indicate that a filter is handled by the source by not returning it for Spark. You can also show that a filter is used by the source by showing it in the output for the plan node, which I think is the `description` method in the latest set of changes.

If you want to check with an external source to see what can be pushed down, then you can do that any time in your source implementation.

On Tue, Dec 11, 2018 at 3:46 AM Noritaka Sekiyama <[hidden email]> wrote:
Hi, 
Thank you for responding to this thread. I'm really interested in this discussion.

My original idea might be the same as what Alessandro said, introducing a mechanism that Spark can communicate with DataSource and get metadata which shows if pushdown is supported or not.
I'm wondering if it will be such expensive or not..

 


2018年12月10日(月) 20:12 Alessandro Solimando <[hidden email]>:
I think you are generally right, but there are so many different scenarios that it might not always be the best option, consider for instance a "fast" network in between a single data source and "Spark", lots of data, an "expensive" (with low selectivity) expression as Wenchen suggested.

In such a case it looks to me that you end up "re-scanning" the whole dataset just to make sure the filter has been applied, where having such an info as metadata or via a communication protocol with the data source (if supported) would be cheaper.

If there is no support at all for such a mechanism I think it could be worth exploring a bit more the idea. However, supporting such a mechanism would require some developing effort for each datasource to support (e.g., asking the datasource for the physical plan applied at query time, the ability to parse it to extract relevant info and act on them), as I am not aware of any general interface for exchanging such information.



On Sun, 9 Dec 2018 at 15:34, Jörn Franke <[hidden email]> wrote:
It is not about lying or not or trust or not. Some or all filters may not be supported by a data source. Some might only be applied under certain environmental conditions (eg enough memory etc). 

It is much more expensive to communicate between Spark and a data source which filters have been applied or not than just checking it as Spark does. Especially if you have several different data sources at the same time (joins etc).

Am 09.12.2018 um 14:30 schrieb Wenchen Fan <[hidden email]>:

expressions/functions can be expensive and I do think Spark should trust data source and not re-apply pushed filters. If data source lies, many things can go wrong...

On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <[hidden email]> wrote:
Well even if it has to apply it again, if pushdown is activated then it will be much less cost for spark to see if the filter has been applied or not. Applying the filter is negligible, what it really avoids if the file format implements it is IO cost (for reading) as well as cost for converting from the file format internal datatype to the one of Spark. Those two things are very expensive, but not the filter check. In the end, it could be also data source internal reasons not to apply a filter (there can be many depending on your scenario, the format etc). Instead of “discussing” between Spark and the data source it is much less costly that Spark checks that the filters are consistently applied.

Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <[hidden email]>:

Hello,
that's an interesting question, but after Frank's reply I am a bit puzzled.

If there is no control over the pushdown status how can Spark guarantee the correctness of the final query?

Consider a filter pushed down to the data source, either Spark has to know if it has been applied or not, or it has to re-apply the filter anyway (and pay the price for that).

Is there any other option I am not considering?

Best regards,
Alessandro

Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <[hidden email]> ha scritto:
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary ending in Spark ( because it would cause unnecessary overhead).
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <[hidden email]>:
>
> Hi,
>
> I'm a support engineer, interested in DataSourceV2.
>
> Recently I had some pain to troubleshoot to check if pushdown is actually applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark to have some way to confirm whether specific pushdown is actually applied in DataSource or not.
>
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
>
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
>
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://sample_bucket/people.json], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:bigint,name:string>
>
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>   
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it would require some API level changes.
>
>
> Warm regards,
>
> Noritaka Sekiyama
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix