[OSS DIGEST] The major changes of Apache Spark from June 3 to June 16

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[OSS DIGEST] The major changes of Apache Spark from June 3 to June 16

Jiang Xingbo
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the title.

CORE

[3.0][SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing (+63, -5)>

A user may name his accumulators using the internal.metrics. prefix, so that Spark treats them as internal accumulators and hides them from UI. We should make JsonProtocol.accumValueToJson more robust and let it ignore internal accumulators that use unrecognized types.

[API][3.1][SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode (+88, -26)>

This PR implements an application wait mechanism that allows spark-submit to wait until the application finishes in Standalone mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed, or killed. This will be controlled via the following conf:

  • spark.standalone.submit.waitAppCompletion (Default: false)

    In standalone cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive polling the driver's status. Otherwise, the client process will exit after submission.

SQL

[3.0][SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled (+27, -12)>

AQE and non-AQE use different configs to set the initial shuffle partition number. This PR fixes repartition/DISTRIBUTE BY so that it also uses the AQE config spark.sql.adaptive.coalescePartitions.initialPartitionNum to set the initial shuffle partition number if AQE is enabled.

[3.0][SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting (+51, -8)>

Spark should throw SparkUpgradeException when getting DateTimeException for datetime formatting in the EXCEPTION legacy Time Parser Policy.

[API][3.0][SPARK-31879][SPARK-31892][SQL] Disable week-based pattern letters in datetime parsing/formatting (+1421, -171)> (+102, -48)>

Week-based pattern letters have very weird behaviors during datetime parsing in Spark 2.4, and it's very hard to simulate the legacy behaviors with the new API. For formatting, the new API makes the start-of-week localized, and it's not possible to keep the legacy behaviors. Since the week-based fields are rarely used, we disable week-based pattern letters in both parsing and formatting.

[3.0][SPARK-31896][SQL] Handle am-pm timestamp parsing when hour is missing (+39, -3)>

This PR sets the hour field to 0 or 12 when the AMPM_OF_DAY field is AM or PM during datetime parsing, to keep the behavior the same as Spark 2.4.

[API][3.1][SPARK-31830][SQL] Consistent error handling for datetime formatting and parsing functions (+126, -580)>

When parsing/formatting datetime values, it's better to fail fast if the pattern string is invalid, instead of returning null for each input record. The formatting functions such as date_format already do it, this PR applies the fail-fast behavior to parsing functions: from_unixtimeunix_timestamp,to_unix_timestampto_timestamp and to_date.

[3.1][SPARK-31910][SQL] Enable Java 8 time API in Thrift server (+23, -0)>

This PR enables Java 8 time API in thriftserver, so that we use the session timezone more consistently.

[2.4][SPARK-31935][SQL] Hadoop file system config should be effective in data source options (+52, -7)>

This PR fixes a bug that the hadoop configs in read/write options are not respected in data source V1.

[API][2.4][SPARK-31968][SQL] Duplicate partition columns check when writing data (+12, -1)>

Add a check for duplicate partition columns when writing built-in file sources. After the change, when the DataFrame has duplicate partition columns, the users get an AnalysisException when writing it. Previously, the writing would succeed, but reading the files with duplicate columns will fail.

[API][3.0][SPARK-26905][SQL] Add TYPE in the ANSI non-reserved list (+2, -0)>

Add TYPE in the ANSI non-reserved list to follow the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-26905][SQL] Follow the SQL:2016 reserved keywords (+429, -5)>

Move keywords ANTISEMI, and MINUS from reserved to non-reserved to comply with the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-31939][SQL][TEST-JAVA11] Fix Parsing day of year when year field pattern is missing (+465, -3)>

When a datetime pattern does not contain a year field (ie. 'yyyy') but contains the day of year field (ie. 'DD'), Spark should still be able to respect the datetime pattern and parse the constants.

Before the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

After the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL

[3.0][SPARK-31956][SQL] Do not fail if there is no ambiguous self join (+7, -2)>

df("col").as("name") is not a column reference anymore, and should not have the special column metadata that is used to identify the root attribute (e.g., Dataset ID and col position). This PR fixes the corresponding regression that could cause a DataFrame could fail even when there is no ambiguous self-join. Below is an example,

val joined = df.join(spark.range(1)).select($"a")
joined.select(joined("a").alias("x"), sum(joined("a")).over(w))

[3.0][SPARK-31958][SQL] normalize special floating numbers in subquery (+18, -4)>

The PR fixes a bug that special floating numbers in non-correlated subquery expressions are not handled, now the subquery expressions will be handled by OptimizeSubqueries.

[API][3.1][SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET (+431, -30)>

Add a built-in SQL function WIDTH_BUCKET, that returns the bucket number to which value would be assigned in an equiwidth histogram with num_bucket buckets, in the range min_value to max_value. Examples:

> SELECT WIDTH_BUCKET(5.3, 0.2, 10.6, 5);
3
> SELECT WIDTH_BUCKET(-2.1, 1.3, 3.4, 3);
0
> SELECT WIDTH_BUCKET(8.1, 0.0, 5.7, 4);
5
> SELECT WIDTH_BUCKET(-0.9, 5.2, 0.5, 2);
3

[3.1][SPARK-27217][SQL] Nested column aliasing for more operators which can prune nested column (+190, -10)>

Support nested column pruning from an Aggregate or Expand operator.

[3.1][SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing (+43, -1)>

Avoid generating redundant aliases if the parent nested field is aliased in the NestedColumnAliasing rule. This slightly improves the performance.

[3.1][SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression/Join (+197, -16)>

Support nested column pruning from a RepartitionByExpression or Join operator.

ML

[3.1][SPARK-31925][ML] Summary.totalIterations greater than maxIters (+43, -12)>

The PR fixes a correctness issue in LogisticRegression and LinearRegression, that the actual round of training iterations was larger by 1 than the specified maxIter.

[3.1][SPARK-31944] Add instance weight support in LinearRegressionSummary (+56, -24)>

The PR adds instance weight support in LinearRegressionSummary, instance weight is already supported by LinearRegression and RegressionMetrics.

SS

[3.0][SPARK-31593][SS] Remove unnecessary streaming query progress update (+58, -7)>

The PR fixes a bug that sets incorrect metrics in Structured Streaming. We should make a progress update every 10 seconds when a stream doesn't have any new data upstream. Without the fix, we zero out the input information but not the output information when making the progress update.

[3.0][SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates (+3, -1)>

The PR proposes to preserve the input order of colNames for groupCols in Dataset.dropDuplicates, because the Streaming's state store depends on the groupCols order.

[3.1][SPARK-24634][SS] Add a new metric regarding number of inputs later than watermark plus allowed delay (+94, -29)>

Add a new metrics numLateInputs to count the number of inputs which are later than watermark ('inputs' are relative to operators). The new metrics will be provided both on the SparkUI - SQL Tab - query execution details page, and on the Streaming Query Listener.

PYTHON

[API][3.0][SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side (+24, -11)>

Improves DataFrame.explain in PySpark, so that it takes the explain mode string as well, which is consistent with the Scala API.

[3.0][SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs (+37, -8)>

The PR proposes to resolve grouping attributes separately first so it can be properly referred to when FlatMapGroupsInPandas and FlatMapCoGroupsInPandas are resolved without ambiguity. Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

[3.1][SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function (+25, -4)>

This PR proposes to make PythonFunction hold Seq[Byte] instead of Array[Byte]. After the change, it can compare if the byte array has the same values. With the proposed change, the cache manager will detect the same function and use the cache for it if it exists.

[3.1][SPARK-31964][PYTHON] Use Pandas is_categorical on Arrow category type conversion (+2, -5)>

When using PyArrow to convert a Pandas categorical column, use is_categorical instead of trying to import CategoricalDtype, because the former is a more stable API.

UI

[3.0][SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show metrics in Query UI (+4, -4)>

In Dataset.collectAsArrowToR and Dataset.collectAsArrowToPython, since the code block for serveToStream is run in the separate thread, withAction finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics. This PR fixes the issue.

The affected functions are:

  • collect() in SparkR
  • DataFrame.toPandas() in PySpark

[3.0][SPARK-31886][WEBUI] Fix the wrong coloring of nodes in DAG-viz (+33, -3)>

In the Job Page and Stage Page, nodes which are associated with "barrier mode" in the DAG-viz will be colored pale green. But, with some types of jobs, nodes which are not associated with the mode will also be colored. This PR fixes it.

[3.1][SPARK-29431][WEBUI] Improve Web UI / Sql tab visualization with cached dataframes (+46, -0)>

Display the query plan of cached DataFrames as well in the web UI.

[2.4][SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading time regression (+49, -86)>

Fix the serious perf issue in web UI by falling back vis-timeline-graph2d to 4.21.0.

[3.1][SPARK-30119][WEBUI] Support pagination for streaming tab (+259, -178)>

The PR adds pagination support for the streaming tab.

[3.1][SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Status column sortable (+7, -6)>

The PR improves the pagination support in the streaming job, by fixing the wrong sorting result and making Status column sortable.


Reply | Threaded
Open this post in threaded view
|

Re: [OSS DIGEST] The major changes of Apache Spark from June 3 to June 16

Holden Karau
I'd also add [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown & 

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors two of the PRs merged after the decommissioning SPIP.


On Tue, Jul 21, 2020 at 10:53 AM Xingbo Jiang <[hidden email]> wrote:
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the title.

CORE

[3.0][SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing (+63, -5)>

A user may name his accumulators using the internal.metrics. prefix, so that Spark treats them as internal accumulators and hides them from UI. We should make JsonProtocol.accumValueToJson more robust and let it ignore internal accumulators that use unrecognized types.

[API][3.1][SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode (+88, -26)>

This PR implements an application wait mechanism that allows spark-submit to wait until the application finishes in Standalone mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed, or killed. This will be controlled via the following conf:

  • spark.standalone.submit.waitAppCompletion (Default: false)

    In standalone cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive polling the driver's status. Otherwise, the client process will exit after submission.

SQL

[3.0][SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled (+27, -12)>

AQE and non-AQE use different configs to set the initial shuffle partition number. This PR fixes repartition/DISTRIBUTE BY so that it also uses the AQE config spark.sql.adaptive.coalescePartitions.initialPartitionNum to set the initial shuffle partition number if AQE is enabled.

[3.0][SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting (+51, -8)>

Spark should throw SparkUpgradeException when getting DateTimeException for datetime formatting in the EXCEPTION legacy Time Parser Policy.

[API][3.0][SPARK-31879][SPARK-31892][SQL] Disable week-based pattern letters in datetime parsing/formatting (+1421, -171)> (+102, -48)>

Week-based pattern letters have very weird behaviors during datetime parsing in Spark 2.4, and it's very hard to simulate the legacy behaviors with the new API. For formatting, the new API makes the start-of-week localized, and it's not possible to keep the legacy behaviors. Since the week-based fields are rarely used, we disable week-based pattern letters in both parsing and formatting.

[3.0][SPARK-31896][SQL] Handle am-pm timestamp parsing when hour is missing (+39, -3)>

This PR sets the hour field to 0 or 12 when the AMPM_OF_DAY field is AM or PM during datetime parsing, to keep the behavior the same as Spark 2.4.

[API][3.1][SPARK-31830][SQL] Consistent error handling for datetime formatting and parsing functions (+126, -580)>

When parsing/formatting datetime values, it's better to fail fast if the pattern string is invalid, instead of returning null for each input record. The formatting functions such as date_format already do it, this PR applies the fail-fast behavior to parsing functions: from_unixtimeunix_timestamp,to_unix_timestampto_timestamp and to_date.

[3.1][SPARK-31910][SQL] Enable Java 8 time API in Thrift server (+23, -0)>

This PR enables Java 8 time API in thriftserver, so that we use the session timezone more consistently.

[2.4][SPARK-31935][SQL] Hadoop file system config should be effective in data source options (+52, -7)>

This PR fixes a bug that the hadoop configs in read/write options are not respected in data source V1.

[API][2.4][SPARK-31968][SQL] Duplicate partition columns check when writing data (+12, -1)>

Add a check for duplicate partition columns when writing built-in file sources. After the change, when the DataFrame has duplicate partition columns, the users get an AnalysisException when writing it. Previously, the writing would succeed, but reading the files with duplicate columns will fail.

[API][3.0][SPARK-26905][SQL] Add TYPE in the ANSI non-reserved list (+2, -0)>

Add TYPE in the ANSI non-reserved list to follow the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-26905][SQL] Follow the SQL:2016 reserved keywords (+429, -5)>

Move keywords ANTISEMI, and MINUS from reserved to non-reserved to comply with the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-31939][SQL][TEST-JAVA11] Fix Parsing day of year when year field pattern is missing (+465, -3)>

When a datetime pattern does not contain a year field (ie. 'yyyy') but contains the day of year field (ie. 'DD'), Spark should still be able to respect the datetime pattern and parse the constants.

Before the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

After the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL

[3.0][SPARK-31956][SQL] Do not fail if there is no ambiguous self join (+7, -2)>

df("col").as("name") is not a column reference anymore, and should not have the special column metadata that is used to identify the root attribute (e.g., Dataset ID and col position). This PR fixes the corresponding regression that could cause a DataFrame could fail even when there is no ambiguous self-join. Below is an example,

val joined = df.join(spark.range(1)).select($"a")
joined.select(joined("a").alias("x"), sum(joined("a")).over(w))

[3.0][SPARK-31958][SQL] normalize special floating numbers in subquery (+18, -4)>

The PR fixes a bug that special floating numbers in non-correlated subquery expressions are not handled, now the subquery expressions will be handled by OptimizeSubqueries.

[API][3.1][SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET (+431, -30)>

Add a built-in SQL function WIDTH_BUCKET, that returns the bucket number to which value would be assigned in an equiwidth histogram with num_bucket buckets, in the range min_value to max_value. Examples:

> SELECT WIDTH_BUCKET(5.3, 0.2, 10.6, 5);
3
> SELECT WIDTH_BUCKET(-2.1, 1.3, 3.4, 3);
0
> SELECT WIDTH_BUCKET(8.1, 0.0, 5.7, 4);
5
> SELECT WIDTH_BUCKET(-0.9, 5.2, 0.5, 2);
3

[3.1][SPARK-27217][SQL] Nested column aliasing for more operators which can prune nested column (+190, -10)>

Support nested column pruning from an Aggregate or Expand operator.

[3.1][SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing (+43, -1)>

Avoid generating redundant aliases if the parent nested field is aliased in the NestedColumnAliasing rule. This slightly improves the performance.

[3.1][SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression/Join (+197, -16)>

Support nested column pruning from a RepartitionByExpression or Join operator.

ML

[3.1][SPARK-31925][ML] Summary.totalIterations greater than maxIters (+43, -12)>

The PR fixes a correctness issue in LogisticRegression and LinearRegression, that the actual round of training iterations was larger by 1 than the specified maxIter.

[3.1][SPARK-31944] Add instance weight support in LinearRegressionSummary (+56, -24)>

The PR adds instance weight support in LinearRegressionSummary, instance weight is already supported by LinearRegression and RegressionMetrics.

SS

[3.0][SPARK-31593][SS] Remove unnecessary streaming query progress update (+58, -7)>

The PR fixes a bug that sets incorrect metrics in Structured Streaming. We should make a progress update every 10 seconds when a stream doesn't have any new data upstream. Without the fix, we zero out the input information but not the output information when making the progress update.

[3.0][SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates (+3, -1)>

The PR proposes to preserve the input order of colNames for groupCols in Dataset.dropDuplicates, because the Streaming's state store depends on the groupCols order.

[3.1][SPARK-24634][SS] Add a new metric regarding number of inputs later than watermark plus allowed delay (+94, -29)>

Add a new metrics numLateInputs to count the number of inputs which are later than watermark ('inputs' are relative to operators). The new metrics will be provided both on the SparkUI - SQL Tab - query execution details page, and on the Streaming Query Listener.

PYTHON

[API][3.0][SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side (+24, -11)>

Improves DataFrame.explain in PySpark, so that it takes the explain mode string as well, which is consistent with the Scala API.

[3.0][SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs (+37, -8)>

The PR proposes to resolve grouping attributes separately first so it can be properly referred to when FlatMapGroupsInPandas and FlatMapCoGroupsInPandas are resolved without ambiguity. Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

[3.1][SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function (+25, -4)>

This PR proposes to make PythonFunction hold Seq[Byte] instead of Array[Byte]. After the change, it can compare if the byte array has the same values. With the proposed change, the cache manager will detect the same function and use the cache for it if it exists.

[3.1][SPARK-31964][PYTHON] Use Pandas is_categorical on Arrow category type conversion (+2, -5)>

When using PyArrow to convert a Pandas categorical column, use is_categorical instead of trying to import CategoricalDtype, because the former is a more stable API.

UI

[3.0][SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show metrics in Query UI (+4, -4)>

In Dataset.collectAsArrowToR and Dataset.collectAsArrowToPython, since the code block for serveToStream is run in the separate thread, withAction finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics. This PR fixes the issue.

The affected functions are:

  • collect() in SparkR
  • DataFrame.toPandas() in PySpark

[3.0][SPARK-31886][WEBUI] Fix the wrong coloring of nodes in DAG-viz (+33, -3)>

In the Job Page and Stage Page, nodes which are associated with "barrier mode" in the DAG-viz will be colored pale green. But, with some types of jobs, nodes which are not associated with the mode will also be colored. This PR fixes it.

[3.1][SPARK-29431][WEBUI] Improve Web UI / Sql tab visualization with cached dataframes (+46, -0)>

Display the query plan of cached DataFrames as well in the web UI.

[2.4][SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading time regression (+49, -86)>

Fix the serious perf issue in web UI by falling back vis-timeline-graph2d to 4.21.0.

[3.1][SPARK-30119][WEBUI] Support pagination for streaming tab (+259, -178)>

The PR adds pagination support for the streaming tab.

[3.1][SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Status column sortable (+7, -6)>

The PR improves the pagination support in the streaming job, by fixing the wrong sorting result and making Status column sortable.




--
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 
Reply | Threaded
Open this post in threaded view
|

Re: [OSS DIGEST] The major changes of Apache Spark from June 3 to June 16

Jiang Xingbo
Hi Holden,

This is the digest for commits merged between June 3 and June 16. The commits you mentioned would be included in the future digests.

Cheers,

Xingbo 

On Tue, Jul 21, 2020 at 11:13 AM Holden Karau <[hidden email]> wrote:
I'd also add [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown & 

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors two of the PRs merged after the decommissioning SPIP.


On Tue, Jul 21, 2020 at 10:53 AM Xingbo Jiang <[hidden email]> wrote:
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the title.

CORE

[3.0][SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing (+63, -5)>

A user may name his accumulators using the internal.metrics. prefix, so that Spark treats them as internal accumulators and hides them from UI. We should make JsonProtocol.accumValueToJson more robust and let it ignore internal accumulators that use unrecognized types.

[API][3.1][SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode (+88, -26)>

This PR implements an application wait mechanism that allows spark-submit to wait until the application finishes in Standalone mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed, or killed. This will be controlled via the following conf:

  • spark.standalone.submit.waitAppCompletion (Default: false)

    In standalone cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive polling the driver's status. Otherwise, the client process will exit after submission.

SQL

[3.0][SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled (+27, -12)>

AQE and non-AQE use different configs to set the initial shuffle partition number. This PR fixes repartition/DISTRIBUTE BY so that it also uses the AQE config spark.sql.adaptive.coalescePartitions.initialPartitionNum to set the initial shuffle partition number if AQE is enabled.

[3.0][SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting (+51, -8)>

Spark should throw SparkUpgradeException when getting DateTimeException for datetime formatting in the EXCEPTION legacy Time Parser Policy.

[API][3.0][SPARK-31879][SPARK-31892][SQL] Disable week-based pattern letters in datetime parsing/formatting (+1421, -171)> (+102, -48)>

Week-based pattern letters have very weird behaviors during datetime parsing in Spark 2.4, and it's very hard to simulate the legacy behaviors with the new API. For formatting, the new API makes the start-of-week localized, and it's not possible to keep the legacy behaviors. Since the week-based fields are rarely used, we disable week-based pattern letters in both parsing and formatting.

[3.0][SPARK-31896][SQL] Handle am-pm timestamp parsing when hour is missing (+39, -3)>

This PR sets the hour field to 0 or 12 when the AMPM_OF_DAY field is AM or PM during datetime parsing, to keep the behavior the same as Spark 2.4.

[API][3.1][SPARK-31830][SQL] Consistent error handling for datetime formatting and parsing functions (+126, -580)>

When parsing/formatting datetime values, it's better to fail fast if the pattern string is invalid, instead of returning null for each input record. The formatting functions such as date_format already do it, this PR applies the fail-fast behavior to parsing functions: from_unixtimeunix_timestamp,to_unix_timestampto_timestamp and to_date.

[3.1][SPARK-31910][SQL] Enable Java 8 time API in Thrift server (+23, -0)>

This PR enables Java 8 time API in thriftserver, so that we use the session timezone more consistently.

[2.4][SPARK-31935][SQL] Hadoop file system config should be effective in data source options (+52, -7)>

This PR fixes a bug that the hadoop configs in read/write options are not respected in data source V1.

[API][2.4][SPARK-31968][SQL] Duplicate partition columns check when writing data (+12, -1)>

Add a check for duplicate partition columns when writing built-in file sources. After the change, when the DataFrame has duplicate partition columns, the users get an AnalysisException when writing it. Previously, the writing would succeed, but reading the files with duplicate columns will fail.

[API][3.0][SPARK-26905][SQL] Add TYPE in the ANSI non-reserved list (+2, -0)>

Add TYPE in the ANSI non-reserved list to follow the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-26905][SQL] Follow the SQL:2016 reserved keywords (+429, -5)>

Move keywords ANTISEMI, and MINUS from reserved to non-reserved to comply with the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-31939][SQL][TEST-JAVA11] Fix Parsing day of year when year field pattern is missing (+465, -3)>

When a datetime pattern does not contain a year field (ie. 'yyyy') but contains the day of year field (ie. 'DD'), Spark should still be able to respect the datetime pattern and parse the constants.

Before the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

After the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL

[3.0][SPARK-31956][SQL] Do not fail if there is no ambiguous self join (+7, -2)>

df("col").as("name") is not a column reference anymore, and should not have the special column metadata that is used to identify the root attribute (e.g., Dataset ID and col position). This PR fixes the corresponding regression that could cause a DataFrame could fail even when there is no ambiguous self-join. Below is an example,

val joined = df.join(spark.range(1)).select($"a")
joined.select(joined("a").alias("x"), sum(joined("a")).over(w))

[3.0][SPARK-31958][SQL] normalize special floating numbers in subquery (+18, -4)>

The PR fixes a bug that special floating numbers in non-correlated subquery expressions are not handled, now the subquery expressions will be handled by OptimizeSubqueries.

[API][3.1][SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET (+431, -30)>

Add a built-in SQL function WIDTH_BUCKET, that returns the bucket number to which value would be assigned in an equiwidth histogram with num_bucket buckets, in the range min_value to max_value. Examples:

> SELECT WIDTH_BUCKET(5.3, 0.2, 10.6, 5);
3
> SELECT WIDTH_BUCKET(-2.1, 1.3, 3.4, 3);
0
> SELECT WIDTH_BUCKET(8.1, 0.0, 5.7, 4);
5
> SELECT WIDTH_BUCKET(-0.9, 5.2, 0.5, 2);
3

[3.1][SPARK-27217][SQL] Nested column aliasing for more operators which can prune nested column (+190, -10)>

Support nested column pruning from an Aggregate or Expand operator.

[3.1][SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing (+43, -1)>

Avoid generating redundant aliases if the parent nested field is aliased in the NestedColumnAliasing rule. This slightly improves the performance.

[3.1][SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression/Join (+197, -16)>

Support nested column pruning from a RepartitionByExpression or Join operator.

ML

[3.1][SPARK-31925][ML] Summary.totalIterations greater than maxIters (+43, -12)>

The PR fixes a correctness issue in LogisticRegression and LinearRegression, that the actual round of training iterations was larger by 1 than the specified maxIter.

[3.1][SPARK-31944] Add instance weight support in LinearRegressionSummary (+56, -24)>

The PR adds instance weight support in LinearRegressionSummary, instance weight is already supported by LinearRegression and RegressionMetrics.

SS

[3.0][SPARK-31593][SS] Remove unnecessary streaming query progress update (+58, -7)>

The PR fixes a bug that sets incorrect metrics in Structured Streaming. We should make a progress update every 10 seconds when a stream doesn't have any new data upstream. Without the fix, we zero out the input information but not the output information when making the progress update.

[3.0][SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates (+3, -1)>

The PR proposes to preserve the input order of colNames for groupCols in Dataset.dropDuplicates, because the Streaming's state store depends on the groupCols order.

[3.1][SPARK-24634][SS] Add a new metric regarding number of inputs later than watermark plus allowed delay (+94, -29)>

Add a new metrics numLateInputs to count the number of inputs which are later than watermark ('inputs' are relative to operators). The new metrics will be provided both on the SparkUI - SQL Tab - query execution details page, and on the Streaming Query Listener.

PYTHON

[API][3.0][SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side (+24, -11)>

Improves DataFrame.explain in PySpark, so that it takes the explain mode string as well, which is consistent with the Scala API.

[3.0][SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs (+37, -8)>

The PR proposes to resolve grouping attributes separately first so it can be properly referred to when FlatMapGroupsInPandas and FlatMapCoGroupsInPandas are resolved without ambiguity. Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

[3.1][SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function (+25, -4)>

This PR proposes to make PythonFunction hold Seq[Byte] instead of Array[Byte]. After the change, it can compare if the byte array has the same values. With the proposed change, the cache manager will detect the same function and use the cache for it if it exists.

[3.1][SPARK-31964][PYTHON] Use Pandas is_categorical on Arrow category type conversion (+2, -5)>

When using PyArrow to convert a Pandas categorical column, use is_categorical instead of trying to import CategoricalDtype, because the former is a more stable API.

UI

[3.0][SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show metrics in Query UI (+4, -4)>

In Dataset.collectAsArrowToR and Dataset.collectAsArrowToPython, since the code block for serveToStream is run in the separate thread, withAction finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics. This PR fixes the issue.

The affected functions are:

  • collect() in SparkR
  • DataFrame.toPandas() in PySpark

[3.0][SPARK-31886][WEBUI] Fix the wrong coloring of nodes in DAG-viz (+33, -3)>

In the Job Page and Stage Page, nodes which are associated with "barrier mode" in the DAG-viz will be colored pale green. But, with some types of jobs, nodes which are not associated with the mode will also be colored. This PR fixes it.

[3.1][SPARK-29431][WEBUI] Improve Web UI / Sql tab visualization with cached dataframes (+46, -0)>

Display the query plan of cached DataFrames as well in the web UI.

[2.4][SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading time regression (+49, -86)>

Fix the serious perf issue in web UI by falling back vis-timeline-graph2d to 4.21.0.

[3.1][SPARK-30119][WEBUI] Support pagination for streaming tab (+259, -178)>

The PR adds pagination support for the streaming tab.

[3.1][SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Status column sortable (+7, -6)>

The PR improves the pagination support in the streaming job, by fixing the wrong sorting result and making Status column sortable.




--
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 
Reply | Threaded
Open this post in threaded view
|

Re: [OSS DIGEST] The major changes of Apache Spark from June 3 to June 16

Holden Karau
Got it, I missed the date in the reading :)

On Tue, Jul 21, 2020 at 11:23 AM Xingbo Jiang <[hidden email]> wrote:
Hi Holden,

This is the digest for commits merged between June 3 and June 16. The commits you mentioned would be included in the future digests.

Cheers,

Xingbo 

On Tue, Jul 21, 2020 at 11:13 AM Holden Karau <[hidden email]> wrote:
I'd also add [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown & 

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors two of the PRs merged after the decommissioning SPIP.


On Tue, Jul 21, 2020 at 10:53 AM Xingbo Jiang <[hidden email]> wrote:
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the title.

CORE

[3.0][SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing (+63, -5)>

A user may name his accumulators using the internal.metrics. prefix, so that Spark treats them as internal accumulators and hides them from UI. We should make JsonProtocol.accumValueToJson more robust and let it ignore internal accumulators that use unrecognized types.

[API][3.1][SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode (+88, -26)>

This PR implements an application wait mechanism that allows spark-submit to wait until the application finishes in Standalone mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed, or killed. This will be controlled via the following conf:

  • spark.standalone.submit.waitAppCompletion (Default: false)

    In standalone cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive polling the driver's status. Otherwise, the client process will exit after submission.

SQL

[3.0][SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled (+27, -12)>

AQE and non-AQE use different configs to set the initial shuffle partition number. This PR fixes repartition/DISTRIBUTE BY so that it also uses the AQE config spark.sql.adaptive.coalescePartitions.initialPartitionNum to set the initial shuffle partition number if AQE is enabled.

[3.0][SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting (+51, -8)>

Spark should throw SparkUpgradeException when getting DateTimeException for datetime formatting in the EXCEPTION legacy Time Parser Policy.

[API][3.0][SPARK-31879][SPARK-31892][SQL] Disable week-based pattern letters in datetime parsing/formatting (+1421, -171)> (+102, -48)>

Week-based pattern letters have very weird behaviors during datetime parsing in Spark 2.4, and it's very hard to simulate the legacy behaviors with the new API. For formatting, the new API makes the start-of-week localized, and it's not possible to keep the legacy behaviors. Since the week-based fields are rarely used, we disable week-based pattern letters in both parsing and formatting.

[3.0][SPARK-31896][SQL] Handle am-pm timestamp parsing when hour is missing (+39, -3)>

This PR sets the hour field to 0 or 12 when the AMPM_OF_DAY field is AM or PM during datetime parsing, to keep the behavior the same as Spark 2.4.

[API][3.1][SPARK-31830][SQL] Consistent error handling for datetime formatting and parsing functions (+126, -580)>

When parsing/formatting datetime values, it's better to fail fast if the pattern string is invalid, instead of returning null for each input record. The formatting functions such as date_format already do it, this PR applies the fail-fast behavior to parsing functions: from_unixtimeunix_timestamp,to_unix_timestampto_timestamp and to_date.

[3.1][SPARK-31910][SQL] Enable Java 8 time API in Thrift server (+23, -0)>

This PR enables Java 8 time API in thriftserver, so that we use the session timezone more consistently.

[2.4][SPARK-31935][SQL] Hadoop file system config should be effective in data source options (+52, -7)>

This PR fixes a bug that the hadoop configs in read/write options are not respected in data source V1.

[API][2.4][SPARK-31968][SQL] Duplicate partition columns check when writing data (+12, -1)>

Add a check for duplicate partition columns when writing built-in file sources. After the change, when the DataFrame has duplicate partition columns, the users get an AnalysisException when writing it. Previously, the writing would succeed, but reading the files with duplicate columns will fail.

[API][3.0][SPARK-26905][SQL] Add TYPE in the ANSI non-reserved list (+2, -0)>

Add TYPE in the ANSI non-reserved list to follow the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-26905][SQL] Follow the SQL:2016 reserved keywords (+429, -5)>

Move keywords ANTISEMI, and MINUS from reserved to non-reserved to comply with the ANSI/SQL standard. The change impacts the behavior only when ANSI mode is on (spark.sql.ansi.enabled=true)

[API][3.0][SPARK-31939][SQL][TEST-JAVA11] Fix Parsing day of year when year field pattern is missing (+465, -3)>

When a datetime pattern does not contain a year field (ie. 'yyyy') but contains the day of year field (ie. 'DD'), Spark should still be able to respect the datetime pattern and parse the constants.

Before the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

After the change:

spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL

[3.0][SPARK-31956][SQL] Do not fail if there is no ambiguous self join (+7, -2)>

df("col").as("name") is not a column reference anymore, and should not have the special column metadata that is used to identify the root attribute (e.g., Dataset ID and col position). This PR fixes the corresponding regression that could cause a DataFrame could fail even when there is no ambiguous self-join. Below is an example,

val joined = df.join(spark.range(1)).select($"a")
joined.select(joined("a").alias("x"), sum(joined("a")).over(w))

[3.0][SPARK-31958][SQL] normalize special floating numbers in subquery (+18, -4)>

The PR fixes a bug that special floating numbers in non-correlated subquery expressions are not handled, now the subquery expressions will be handled by OptimizeSubqueries.

[API][3.1][SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET (+431, -30)>

Add a built-in SQL function WIDTH_BUCKET, that returns the bucket number to which value would be assigned in an equiwidth histogram with num_bucket buckets, in the range min_value to max_value. Examples:

> SELECT WIDTH_BUCKET(5.3, 0.2, 10.6, 5);
3
> SELECT WIDTH_BUCKET(-2.1, 1.3, 3.4, 3);
0
> SELECT WIDTH_BUCKET(8.1, 0.0, 5.7, 4);
5
> SELECT WIDTH_BUCKET(-0.9, 5.2, 0.5, 2);
3

[3.1][SPARK-27217][SQL] Nested column aliasing for more operators which can prune nested column (+190, -10)>

Support nested column pruning from an Aggregate or Expand operator.

[3.1][SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing (+43, -1)>

Avoid generating redundant aliases if the parent nested field is aliased in the NestedColumnAliasing rule. This slightly improves the performance.

[3.1][SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression/Join (+197, -16)>

Support nested column pruning from a RepartitionByExpression or Join operator.

ML

[3.1][SPARK-31925][ML] Summary.totalIterations greater than maxIters (+43, -12)>

The PR fixes a correctness issue in LogisticRegression and LinearRegression, that the actual round of training iterations was larger by 1 than the specified maxIter.

[3.1][SPARK-31944] Add instance weight support in LinearRegressionSummary (+56, -24)>

The PR adds instance weight support in LinearRegressionSummary, instance weight is already supported by LinearRegression and RegressionMetrics.

SS

[3.0][SPARK-31593][SS] Remove unnecessary streaming query progress update (+58, -7)>

The PR fixes a bug that sets incorrect metrics in Structured Streaming. We should make a progress update every 10 seconds when a stream doesn't have any new data upstream. Without the fix, we zero out the input information but not the output information when making the progress update.

[3.0][SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates (+3, -1)>

The PR proposes to preserve the input order of colNames for groupCols in Dataset.dropDuplicates, because the Streaming's state store depends on the groupCols order.

[3.1][SPARK-24634][SS] Add a new metric regarding number of inputs later than watermark plus allowed delay (+94, -29)>

Add a new metrics numLateInputs to count the number of inputs which are later than watermark ('inputs' are relative to operators). The new metrics will be provided both on the SparkUI - SQL Tab - query execution details page, and on the Streaming Query Listener.

PYTHON

[API][3.0][SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side (+24, -11)>

Improves DataFrame.explain in PySpark, so that it takes the explain mode string as well, which is consistent with the Scala API.

[3.0][SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs (+37, -8)>

The PR proposes to resolve grouping attributes separately first so it can be properly referred to when FlatMapGroupsInPandas and FlatMapCoGroupsInPandas are resolved without ambiguity. Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

[3.1][SPARK-31945][SQL][PYSPARK] Enable cache for the same Python function (+25, -4)>

This PR proposes to make PythonFunction hold Seq[Byte] instead of Array[Byte]. After the change, it can compare if the byte array has the same values. With the proposed change, the cache manager will detect the same function and use the cache for it if it exists.

[3.1][SPARK-31964][PYTHON] Use Pandas is_categorical on Arrow category type conversion (+2, -5)>

When using PyArrow to convert a Pandas categorical column, use is_categorical instead of trying to import CategoricalDtype, because the former is a more stable API.

UI

[3.0][SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show metrics in Query UI (+4, -4)>

In Dataset.collectAsArrowToR and Dataset.collectAsArrowToPython, since the code block for serveToStream is run in the separate thread, withAction finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics. This PR fixes the issue.

The affected functions are:

  • collect() in SparkR
  • DataFrame.toPandas() in PySpark

[3.0][SPARK-31886][WEBUI] Fix the wrong coloring of nodes in DAG-viz (+33, -3)>

In the Job Page and Stage Page, nodes which are associated with "barrier mode" in the DAG-viz will be colored pale green. But, with some types of jobs, nodes which are not associated with the mode will also be colored. This PR fixes it.

[3.1][SPARK-29431][WEBUI] Improve Web UI / Sql tab visualization with cached dataframes (+46, -0)>

Display the query plan of cached DataFrames as well in the web UI.

[2.4][SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading time regression (+49, -86)>

Fix the serious perf issue in web UI by falling back vis-timeline-graph2d to 4.21.0.

[3.1][SPARK-30119][WEBUI] Support pagination for streaming tab (+259, -178)>

The PR adds pagination support for the streaming tab.

[3.1][SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Status column sortable (+7, -6)>

The PR improves the pagination support in the streaming job, by fixing the wrong sorting result and making Status column sortable.




--
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 


--
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9