Quantcast

[SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

StanZhai
We can reproduce this using the following code:
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()

val sql1 =
  """
    |create temporary view tb as select * from values
    |(1, 0),
    |(1, 0),
    |(2, 0)
    |as grouping(a, b)
  """.stripMargin

val sql =
  """
    |select count(distinct(b)) over (partition by a) from tb group by a
  """.stripMargin

spark.sql(sql1)
spark.sql(sql).show()

It will throw exception like this:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'tb.`b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
+- Project [b#1, a#0, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
   +- Window [count(distinct b#1) windowspecdefinition(a#0, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L], [a#0]
      +- Aggregate [a#0], [b#1, a#0]
         +- SubqueryAlias tb
            +- Project [a#0, b#1]
               +- SubqueryAlias grouping
                  +- LocalRelation [a#0, b#1]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

But, there is no exception in Spark 1.6.x.

I think the sql select count(distinct(b)) over (partition by a) from tb group by a should be executed. I've no idea about the exception. Is this in line with expectations?

Any help is appreciated!

Best, 

Stan


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

Herman van Hövell tot Westerflier-2
You are seeing a bug in the Hive parser. Hive drops the window clause when it encounters a count(distinct ...). See https://issues.apache.org/jira/browse/HIVE-10141 for more information.

Spark 1.6 plans this as a regular distinct aggregate (dropping the window clause), which is wrong. Spark 2.x uses its own parser, and it does not allow you to do use 'distinct' aggregates in window functions. You are getting this error because aggregates are planned before a windows, and the aggregate cannot find b in its grouping by expressions.

On Wed, Mar 8, 2017 at 5:21 AM, StanZhai <[hidden email]> wrote:
We can reproduce this using the following code:
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()

val sql1 =
  """
    |create temporary view tb as select * from values
    |(1, 0),
    |(1, 0),
    |(2, 0)
    |as grouping(a, b)
  """.stripMargin

val sql =
  """
    |select count(distinct(b)) over (partition by a) from tb group by a
  """.stripMargin

spark.sql(sql1)
spark.sql(sql).show()

It will throw exception like this:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'tb.`b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
+- Project [b#1, a#0, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
   +- Window [count(distinct b#1) windowspecdefinition(a#0, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L], [a#0]
      +- Aggregate [a#0], [b#1, a#0]
         +- SubqueryAlias tb
            +- Project [a#0, b#1]
               +- SubqueryAlias grouping
                  +- LocalRelation [a#0, b#1]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

But, there is no exception in Spark 1.6.x.

I think the sql select count(distinct(b)) over (partition by a) from tb group by a should be executed. I've no idea about the exception. Is this in line with expectations?

Any help is appreciated!

Best, 

Stan




View this message in context: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.



--

Herman van Hövell

Software Engineer

Databricks Inc.

[hidden email]

+31 6 420 590 27

databricks.com

http://databricks.com

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re:Re: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

StanZhai
Thanks for your reply!
I know what's going on now.


"Herman van Hövell tot Westerflier-2 [via Apache Spark Developers List]"<[hidden email]> wroted at 2017-03-08 21:35:

You are seeing a bug in the Hive parser. Hive drops the window clause when it encounters a count(distinct ...). See https://issues.apache.org/jira/browse/HIVE-10141 for more information.

Spark 1.6 plans this as a regular distinct aggregate (dropping the window clause), which is wrong. Spark 2.x uses its own parser, and it does not allow you to do use 'distinct' aggregates in window functions. You are getting this error because aggregates are planned before a windows, and the aggregate cannot find b in its grouping by expressions.

On Wed, Mar 8, 2017 at 5:21 AM, StanZhai <[hidden email]> wrote:
We can reproduce this using the following code:
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()

val sql1 =
  """
    |create temporary view tb as select * from values
    |(1, 0),
    |(1, 0),
    |(2, 0)
    |as grouping(a, b)
  """.stripMargin

val sql =
  """
    |select count(distinct(b)) over (partition by a) from tb group by a
  """.stripMargin

spark.sql(sql1)
spark.sql(sql).show()

It will throw exception like this:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'tb.`b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Project [count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
+- Project [b#1, a#0, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
   +- Window [count(distinct b#1) windowspecdefinition(a#0, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L], [a#0]
      +- Aggregate [a#0], [b#1, a#0]
         +- SubqueryAlias tb
            +- Project [a#0, b#1]
               +- SubqueryAlias grouping
                  +- LocalRelation [a#0, b#1]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:247)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

But, there is no exception in Spark 1.6.x.

I think the sql select count(distinct(b)) over (partition by a) from tb group by a should be executed. I've no idea about the exception. Is this in line with expectations?

Any help is appreciated!

Best, 

Stan




View this message in context: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.



--

Herman van Hövell

Software Engineer

Databricks Inc.

[hidden email]

+31 6 420 590 27

databricks.com

http://databricks.com




To start a new topic under Apache Spark Developers List, email [hidden email]
To unsubscribe from Apache Spark Developers List, click here.
NAML

Loading...