Re: Strange behavior with 'not' and filter pushdown

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Strange behavior with 'not' and filter pushdown

Takeshi Yamamuro
cc: xiao

IIUC a xiao's commit below fixed this issue in master.

Is this fix worth backporting to the v2.0 branch?
I checked I could reproduce there:

---

scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0", "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
scala> spark.read.parquet("/Users/maropu/Desktop/data").createOrReplaceTempView("t")
scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
scala> df.explain(true)
== Parsed Logical Plan ==
'Project ['c0]
+- 'Filter NOT isnotnull('c1)
   +- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
c0: int
Project [c0#16]
+- Filter NOT isnotnull(c1#17)
   +- SubqueryAlias t
      +- Relation[c0#16,c1#17] parquet

== Optimized Logical Plan ==
Project [c0#16]
+- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
           ^^^^^^^^^^^^^^^^
   +- Relation[c0#16,c1#17] parquet

== Physical Plan ==
*Project [c0#16]
+- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
   +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [], PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: struct<c0:int,c1:string>

scala> df.show
+---+
| c0|
+---+
+---+




// maropu


On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <[hidden email]> wrote:
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <[hidden email]> wrote:
Bumping this thread.

Translating "where not(username is not null)" into a filter of  [IsNotNull(username), Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema: struct<username:string>

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))
    
val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))
    
val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <[hidden email]> wrote:
Hi,

I have an application where I’m filtering data with SparkSQL with simple WHERE clauses. I also want the ability to show the unmatched rows for any filter, and so am wrapping the previous clause in `NOT()` to get the inverse. Example:

Filter:  username is not null
Inverse filter:  NOT(username is not null)

This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse filter always returns zero results. It looks like this is a problem with how the filter is getting pushed down to Parquet. Specifically, the pushdown includes both the “is not null” filter, AND “not(is not null)”, which would obviously result in zero matches. An example below:

pyspark:
> x = spark.sql('select my_id from my_table where username is not null')
> y = spark.sql('select my_id from my_table where not(username is not null)')                                                                                                                            
> x.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter isnotnull(username#91)
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [], PushedFilters: [IsNotNull(username)],
       ReadSchema: struct<my_id:bigint,username:string>
[1159]> y.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [],
       PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],username
       ReadSchema: struct<my_id:bigint,username:string>

Presently I’m working around this by using the new functionality of NOT EXISTS in Spark 2, but that seems like overkill.

Any help appreciated.

Alexi Kostibas
Engineering
Nuna
650 Townsend Street, Suite 425
San Francisco, CA 94103






--
---
Takeshi Yamamuro
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Strange behavior with 'not' and filter pushdown

Xiao Li
https://github.com/apache/spark/pull/16894

Already backported to Spark 2.0

Thanks!

Xiao

2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <[hidden email]>:
cc: xiao

IIUC a xiao's commit below fixed this issue in master.

Is this fix worth backporting to the v2.0 branch?
I checked I could reproduce there:

---

scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0", "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
scala> spark.read.parquet("/Users/maropu/Desktop/data").createOrReplaceTempView("t")
scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
scala> df.explain(true)
== Parsed Logical Plan ==
'Project ['c0]
+- 'Filter NOT isnotnull('c1)
   +- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
c0: int
Project [c0#16]
+- Filter NOT isnotnull(c1#17)
   +- SubqueryAlias t
      +- Relation[c0#16,c1#17] parquet

== Optimized Logical Plan ==
Project [c0#16]
+- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
           ^^^^^^^^^^^^^^^^
   +- Relation[c0#16,c1#17] parquet

== Physical Plan ==
*Project [c0#16]
+- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
   +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [], PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: struct<c0:int,c1:string>

scala> df.show
+---+
| c0|
+---+
+---+




// maropu


On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <[hidden email]> wrote:
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <[hidden email]> wrote:
Bumping this thread.

Translating "where not(username is not null)" into a filter of  [IsNotNull(username), Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema: struct<username:string>

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))
    
val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))
    
val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <[hidden email]> wrote:
Hi,

I have an application where I’m filtering data with SparkSQL with simple WHERE clauses. I also want the ability to show the unmatched rows for any filter, and so am wrapping the previous clause in `NOT()` to get the inverse. Example:

Filter:  username is not null
Inverse filter:  NOT(username is not null)

This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse filter always returns zero results. It looks like this is a problem with how the filter is getting pushed down to Parquet. Specifically, the pushdown includes both the “is not null” filter, AND “not(is not null)”, which would obviously result in zero matches. An example below:

pyspark:
> x = spark.sql('select my_id from my_table where username is not null')
> y = spark.sql('select my_id from my_table where not(username is not null)')                                                                                                                            
> x.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter isnotnull(username#91)
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [], PushedFilters: [IsNotNull(username)],
       ReadSchema: struct<my_id:bigint,username:string>
[1159]> y.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [],
       PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],username
       ReadSchema: struct<my_id:bigint,username:string>

Presently I’m working around this by using the new functionality of NOT EXISTS in Spark 2, but that seems like overkill.

Any help appreciated.

Alexi Kostibas
Engineering
Nuna
650 Townsend Street, Suite 425
San Francisco, CA 94103






--
---
Takeshi Yamamuro

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Strange behavior with 'not' and filter pushdown

Takeshi Yamamuro
Oh, Thanks for checking!

On Tue, Feb 14, 2017 at 12:32 PM, Xiao Li <[hidden email]> wrote:
https://github.com/apache/spark/pull/16894

Already backported to Spark 2.0

Thanks!

Xiao

2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <[hidden email]>:
cc: xiao

IIUC a xiao's commit below fixed this issue in master.

Is this fix worth backporting to the v2.0 branch?
I checked I could reproduce there:

---

scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0", "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
scala> spark.read.parquet("/Users/maropu/Desktop/data").createOrReplaceTempView("t")
scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
scala> df.explain(true)
== Parsed Logical Plan ==
'Project ['c0]
+- 'Filter NOT isnotnull('c1)
   +- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
c0: int
Project [c0#16]
+- Filter NOT isnotnull(c1#17)
   +- SubqueryAlias t
      +- Relation[c0#16,c1#17] parquet

== Optimized Logical Plan ==
Project [c0#16]
+- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
           ^^^^^^^^^^^^^^^^
   +- Relation[c0#16,c1#17] parquet

== Physical Plan ==
*Project [c0#16]
+- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
   +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [], PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: struct<c0:int,c1:string>

scala> df.show
+---+
| c0|
+---+
+---+




// maropu


On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <[hidden email]> wrote:
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <[hidden email]> wrote:
Bumping this thread.

Translating "where not(username is not null)" into a filter of  [IsNotNull(username), Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema: struct<username:string>

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))
    
val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))
    
val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <[hidden email]> wrote:
Hi,

I have an application where I’m filtering data with SparkSQL with simple WHERE clauses. I also want the ability to show the unmatched rows for any filter, and so am wrapping the previous clause in `NOT()` to get the inverse. Example:

Filter:  username is not null
Inverse filter:  NOT(username is not null)

This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse filter always returns zero results. It looks like this is a problem with how the filter is getting pushed down to Parquet. Specifically, the pushdown includes both the “is not null” filter, AND “not(is not null)”, which would obviously result in zero matches. An example below:

pyspark:
> x = spark.sql('select my_id from my_table where username is not null')
> y = spark.sql('select my_id from my_table where not(username is not null)')                                                                                                                            
> x.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter isnotnull(username#91)
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [], PushedFilters: [IsNotNull(username)],
       ReadSchema: struct<my_id:bigint,username:string>
[1159]> y.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [],
       PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],username
       ReadSchema: struct<my_id:bigint,username:string>

Presently I’m working around this by using the new functionality of NOT EXISTS in Spark 2, but that seems like overkill.

Any help appreciated.

Alexi Kostibas
Engineering
Nuna
650 Townsend Street, Suite 425
San Francisco, CA 94103






--
---
Takeshi Yamamuro




--
---
Takeshi Yamamuro
Loading...