[SQL] Unresolved reference with chained window functions.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[SQL] Unresolved reference with chained window functions.

zero323
Forwarded from SO (http://stackoverflow.com/q/43007433). Looks like
regression compared to 2.0.2.

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win_spec_max =
Window.partitionBy("x").orderBy("AmtPaid").rowsBetween(Window.unboundedPreceding,
0)
win_spec_max: org.apache.spark.sql.expressions.WindowSpec =
org.apache.spark.sql.expressions.WindowSpec@3433e418

scala> val df = Seq((1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1,
-1.0)).toDF("x", "AmtPaid")
df: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double]

scala> val df_with_sum = df.withColumn("AmtPaidCumSum",
sum(col("AmtPaid")).over(win_spec_max))
df_with_sum: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 1 more field]

scala> val df_with_max = df_with_sum.withColumn("AmtPaidCumSumMax",
max(col("AmtPaidCumSum")).over(win_spec_max))
df_with_max: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 2 more fields]

scala> df_with_max.explain
== Physical Plan ==
!Window [sum(AmtPaid#361) windowspecdefinition(x#360, AmtPaid#361 ASC
NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
AmtPaidCumSum#366, max(AmtPaidCumSum#366) windowspecdefinition(x#360,
AmtPaid#361 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) AS AmtPaidCumSumMax#372], [x#360], [AmtPaid#361 ASC NULLS
FIRST]
+- *Sort [x#360 ASC NULLS FIRST, AmtPaid#361 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(x#360, 200)
      +- LocalTableScan [x#360, AmtPaid#361]

scala> df_with_max.printSchema
root
 |-- x: integer (nullable = false)
 |-- AmtPaid: double (nullable = false)
 |-- AmtPaidCumSum: double (nullable = true)
 |-- AmtPaidCumSumMax: double (nullable = true)

scala> df_with_max.show
17/03/24 21:22:32 ERROR Executor: Exception in task 0.0 in stage 19.0
(TID 234)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: AmtPaidCumSum#366
    at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
   ...
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#366
in [sum#385,max#386,x#360,AmtPaid#361]
   ...

Is it a known issue or do we need a JIRA?

--
Best,
Maciej Szymkiewicz


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL] Unresolved reference with chained window functions.

Herman van Hövell tot Westerflier-2
This is definitely a bug in the CollapseWindow optimizer rule. I think we can use SPARK-20086 to track this.

On Fri, Mar 24, 2017 at 9:28 PM, Maciej Szymkiewicz <[hidden email]> wrote:
Forwarded from SO (http://stackoverflow.com/q/43007433). Looks like
regression compared to 2.0.2.

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win_spec_max =
Window.partitionBy("x").orderBy("AmtPaid").rowsBetween(Window.unboundedPreceding,
0)
win_spec_max: org.apache.spark.sql.expressions.WindowSpec =
org.apache.spark.sql.expressions.WindowSpec@3433e418

scala> val df = Seq((1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1,
-1.0)).toDF("x", "AmtPaid")
df: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double]

scala> val df_with_sum = df.withColumn("AmtPaidCumSum",
sum(col("AmtPaid")).over(win_spec_max))
df_with_sum: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 1 more field]

scala> val df_with_max = df_with_sum.withColumn("AmtPaidCumSumMax",
max(col("AmtPaidCumSum")).over(win_spec_max))
df_with_max: org.apache.spark.sql.DataFrame = [x: int, AmtPaid: double
... 2 more fields]

scala> df_with_max.explain
== Physical Plan ==
!Window [sum(AmtPaid#361) windowspecdefinition(x#360, AmtPaid#361 ASC
NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
AmtPaidCumSum#366, max(AmtPaidCumSum#366) windowspecdefinition(x#360,
AmtPaid#361 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) AS AmtPaidCumSumMax#372], [x#360], [AmtPaid#361 ASC NULLS
FIRST]
+- *Sort [x#360 ASC NULLS FIRST, AmtPaid#361 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(x#360, 200)
      +- LocalTableScan [x#360, AmtPaid#361]

scala> df_with_max.printSchema
root
 |-- x: integer (nullable = false)
 |-- AmtPaid: double (nullable = false)
 |-- AmtPaidCumSum: double (nullable = true)
 |-- AmtPaidCumSumMax: double (nullable = true)

scala> df_with_max.show
17/03/24 21:22:32 ERROR Executor: Exception in task 0.0 in stage 19.0
(TID 234)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: AmtPaidCumSum#366
    at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
   ...
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#366
in [sum#385,max#386,x#360,AmtPaid#361]
   ...

Is it a known issue or do we need a JIRA?

--
Best,
Maciej Szymkiewicz


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--

Herman van Hövell

Software Engineer

Databricks Inc.

[hidden email]

+31 6 420 590 27

databricks.com

http://databricks.com


Join Databricks at Spark Summit 2017 in San Francisco, the world's largest event for the Apache Spark community.

Loading...