[OSS DIGEST] The major changes of Apache Spark from May 6 to May 19
This is the bi-weekly Apache Spark digest from the Databricks OSS team. For each API/configuration/behavior change, there will be an [API] tag in the title.
[3.0][SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available (+14, -1)>
Re-obtain tokens at the start of AM for yarn cluster mode, if principal and keytab are available. It basically transfers the credentials from the original user, so this patch puts the new tokens into credentials from the original user via overwriting.
Submitter will obtain delegation tokens for yarn-cluster mode, and add these credentials to the launch context. AM will be launched with these credentials, and AM and driver are able to leverage these tokens.
In Yarn cluster mode, driver is launched in AM, which in turn initializes token manager (while initializing SparkContext) and obtain delegation tokens (+ schedule to renew) if both principal and keytab are available.
[2.4][SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner (+434, -47)>
There had been previous efforts to extend Spark's ClosureCleaner to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work is done at SPARK-14540.
But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing this -- a REPL line object.
This PR proposes to enhance Spark's ClosureCleaner to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.
[3.0][SPARK-31743][CORE] Add spark_info metric into PrometheusResource (+2, -0)>
[API][3.1][SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned (+409, -13)>
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors. It introduces 3 new configurations:
Whether to decommission the block manager when decommissioning executor
The interval of time between consecutive cache block replication reattempts happening on each decommissioning executor (due to storage decommissioning).
[API][3.0][SPARK-31365][SQL] Enable nested predicate pushdown per data sources (+186, -100)>
Replaces a config spark.sql.optimizer.nestedPredicatePushdown.enabled with spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources which can configure which v1 data sources are enabled with nested predicate pushdown, but the previous config is an all or nothing config, and applies on all the data sources.
In order to not introduce an unexpected API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data source.
A comma-separated list of data source short names or fully qualified data source implementation class names for which Spark tries to push down predicates for nested columns and/or names containing dots to data sources. This configuration is only effective with file-based data source in DSv1. Currently, Parquet implements both optimizations while ORC only supports predicates for names containing dots. The other data sources don't support this feature yet. So the default value is 'parquet,orc'.
[API][3.0][SPARK-31393][SQL] Show the correct alias in schema for expression (+21, -14)>
Some alias of expression can not display correctly in schema. This PR will fix them.
[API][3.0][SPARK-31595][SQL] Spark SQL CLI should allow unescaped quote mark in quoted string (+16, -2)>
Spark SQL CLI cannot handle unescaped quote mark like "'" or '"' correctly. When there are unmatched quotes in a string, it will not drop off semicolon as expected.
Deprecates 'spark.sql.optimizer.metadataOnly' configuration and plan to remove it in the future releases.
This optimization can cause a potential correctness issue, see also SPARK-26709. Also, it seems difficult to extend the optimization. Basically you should whitelist all available functions. It costs some maintenance overhead, see also SPARK-31590.
[2.4][SPARK-31663][SQL] Grouping sets with having clause returns the wrong result (+145, -39)>
GROUPING SETS with a HAVING clause could return a wrong result when the condition of HAVING contains the conflicting naming. See the below example:
select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10
The b in having b > 10 should be resolved as T.b not sum(a), so the right result should be
instead of an empty result set.
The root cause is similar to SPARK-31519. It is caused by the behavior that we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue in query analysis. This PR is to fix this correctness bug.
[API][2.4][SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory (+58, -3)>
As shown above, the FileSystem instance via a URL can be different because the Hadoop configuration was not respected in the registered FsUrlStreamHandlerFactory that handles the URLs.
[API][3.0][SPARK-30973][SQL] ScriptTransformationExec should wait for the termination (+56, -1)>
Before the fix, when users use the TRANSFORM with a Python script that contains a parser error, Spark successfully finishes and returns an empty result set. After the fix, Spark fails and shows the exception properly.
This is usually fine. However, if we copy the agg func somehow after agg planning, like PlanSubqueries, the DeclarativeAggregate will be replaced by a new instance with new inputAggBufferAttributes and mergeExpressions. Then we can't bind the mergeExpressions with the output of the partial aggregate operator, as it uses the inputAggBufferAttributes of the original DeclarativeAggregate before copy.
Instead of using child.output directly, we should use inputAggBufferAttributes from the current agg expression for Final and PartialMerge aggregates to bind references for their mergeExpression.
[API][3.0][SPARK-31701][R][SQL] Bump up the minimum Arrow version as 0.15.1 in SparkR (+5, -10)>
It will reduce the maintenance overhead to match the Arrow versions, and minimize the supported range. SparkR Arrow optimization is experimental yet.
[API][3.0][SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax (+91, -167)>
Revert SPARK-30098, which brought confusion and frustration on using create table DDL query, and we agreed about the bad effect on the change. Please read the discussion thread for more details.
[3.1][SPARK-31684][SQL] Overwrite partition failed with 'WRONG FS' when the target partition is not belong to the filesystem as same as the table (+209, -4)>
With SPARK-18107, we will disable the underlying replace(overwrite) and instead do delete in Spark side and only do copy in Hive side to bypass the performance issue, see also HIVE-11940.