[OSS DIGEST] The major changes of Apache Spark from Feb 26 to Mar 10
This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an [API] tag in the title.
[3.0][SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks (+59, -12)>
When a job finished, its running (re-submitted) map stages should be marked as finished if not used by the other jobs. The running tasks of these stages are cancelled. Also, the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone. This PR fixed the issues.
[3.0][SPARK-30969][CORE] Remove resource coordination support from Standalone (+17, -463)>
Resource coordination is mainly designed for the scenario where multiple workers launched on the same host. However, it is, actually, a non-existed scenario for today's Spark. Spark now can start multiple executors in a single Worker, while it only allows one executor per Worker at very beginning. It really helps nothing for users to launch multiple workers on the same host. Thus, it's not worth for us to bring over complicated implementation and potential high maintain cost for such an impossible scenario.
[2.4][SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq) (+2, -1)>
This PR fixes a thread-safety bug in SparkSession.createDataset(Seq): if the caller-supplied Encoder is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
[2.4][SPARK-30993][SQL] Use its sql type for UDT when checking the type of length (fixed/var) or mutable (+85, -1)>
This patch fixes the bug of UnsafeRow, which does not handle the UDT specifically, in isFixedLength and isMutable. These methods don't check its SQL type for UDT, always treating UDT as variable-length, and non-mutable.
It doesn't bring any issue if UDT is used to represent the complicated type, but when UDT is used to represent some type that is matched with fixed length of SQL type, it exposes the chance of correctness issues, as these informations sometimes decide how the value should be handled.
Misclassifying the type of length for UDT can corrupt the value when the row is presented to the input of GenerateUnsafeRowJoiner, which brings the correctness issue.
[2.4][SPARK-30998][SQL] ClassCastException when a generator having nested inner generators (+40, -3)>
A query below failed;
scala> sql("select array(array(1, 2), array(3)) ar").select(explode(explode($"ar"))).show()
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
This PR modified the hasNestedGenerator code in ExtractGenerator for correctly catching the nested inner generators.
[3.0]Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" (+43, -67)>
In the Hive module, the server side provides the Date-time values that simply use value.toSting, and the client side regenerates the results back in HiveBaseResultSet with java.sql.Date(Timestamp).valueOf. There will be inconsistency between client and server if we use java8 APIs. Thus, the PR got reverted.
[API][3.0][SPARK-27619][SQL] MapType should be prohibited in hash expressions (+39, -17)>
hash() and xxhash64() cannot be used on elements of Maptype. A new configuration spark.sql.legacy.allowHashOnMapType is introduced to allow users to restore the previous behaviour.
When spark.sql.legacy.allowHashOnMapType is set to false:
scala> spark.sql("select hash(map())");
org.apache.spark.sql.AnalysisException: cannot resolve 'hash(map())' due to data type mismatch: input to function hash cannot contain elements of MapType; line 1 pos 7;
'Project [unresolvedalias(hash(map(), 42), None)]
when spark.sql.legacy.allowHashOnMapType is set to true :
[API][3.0][SPARK-30902][SQL] Default table provider should be decided by catalog implementations (+84, -55)>
When CREATE TABLE SQL statement does not specify the provider, leave it to the catalog implementations to decide.
[3.0][SPARK-30972][SQL] PruneHiveTablePartitions should be executed as earlyScanPushDownRules (+21, -16)>
Similar to rule PruneFileSourcePartitions, PruneHiveTablePartitions should also be executed as earlyScanPushDownRules to eliminate the impact on statistic computation later.
[3.0][SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions (+34, -0)>
We have supported the generators in SQL aggregate expressions by SPARK-28782, but the generator(explode) query with the aggregate functions in DataFrame failed.
The root cause is that ExtractGenerator wrongly replaces a project w/ aggregate functions before GlobalAggregates replaces it with an aggregate.
To avoid the case in ExtractGenerator, this PR adds a condition to ignore generators having the aggregate functions.
[API][3.0][SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces (+115, -7)>
Currently, Spark parses interval from multi units strings or from date-time/year-month pattern strings. The former handles all whitespace, while the latter doesn't. After changes, Spark handles spaces for both date-time and year-month pattern strings.
select interval '\n-\t10\t 12:34:46.789\t' day to second
-- !query 126 schema
struct<INTERVAL '-10 days -12 hours -34 minutes -46.789 seconds':interval>
-- !query 126 output
-10 days -12 hours -34 minutes -46.789 seconds
[3.0][SPARK-30563][SQL] Disable using commit coordinator with NoopDataSource (+1, -0)>
This PR disables using the commit coordinator with NoopDataSource. There is no need for a coordinator in benchmarks.
[API][3.0][SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter (+269, -96)>
Add a new SQL conf spark.sql.legacy.timeParserPolicy:
When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0.
When set to CORRECTED, classes from java.time.* packages are used for the same purpose.
The default value is EXCEPTION, RuntimeException is thrown when we will get different results.
[3.0][SPARK-30885][SQL][FOLLOW-UP] Fix issues where some V1 commands allow tables that are not fully qualified (+65, -42)>
There are few V1 commands such as REFRESH TABLE that still allow spark_catalog.t because they run the commands with parsed table names without trying to load them in the catalog. This PR addresses this issue.
This PR adds back this legacy fallback. Since we switch the API to do datetime operations, we can't be exactly the same as before. Here we add back the support of the legacy formats that are common (examples of Spark 2.4):
This PR refines the AQE related config names(starting with spark.sql.adaptive):
remove the "shuffle" prefix. AQE is all about shuffle and we don't need to add the "shuffle" prefix everywhere.
targetPostShuffleInputSize is obscure, rename to advisoryShufflePartitionSizeInBytes.
reducePostShufflePartitions doesn't match the actual optimization, rename to coalesceShufflePartitions
minNumPostShufflePartitions is obscure, rename it minPartitionNum under the coalesceShufflePartitions namespace
maxNumPostShufflePartitions is confusing with the word "max", rename it initialPartitionNum
skewedJoinOptimization is too verbose. skew join is a well-known terminology in database area, we can just say skewJoin
[3.0][SPARK-31038][SQL] Add checkValue for spark.sql.session.timeZone (+27, -0)>
The spark.sql.session.timeZone config can accept any string value including invalid time zone ids, then it will fail other queries that rely on the time zone. We should do the value checking in the set phase and fail fast if the zone value is invalid.
[3.0][SPARK-31053][SQL] mark connector APIs as Evolving (+48, -40)>
This PR makes it consistent and mark all Connector APIs as Evolving.
[API][3.0][SPARK-31061][SQL] Provide ability to alter the provider of a table (+49, -1)>
This PR adds functionality to HiveExternalCatalog to be able to change the provider of a table.
This is useful for catalogs in Spark 3.0 to be able to use alterTable to change the provider of a table as part of an atomic REPLACE TABLE function.
[API][3.0][SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source (+54, -8)>
This PR proposes two things:
Convert null to string type during schema inference of schema_of_json as JSON datasource does. This is a bug fix as well because null string is not the proper DDL formatted string and it is unable for SQL parser to recognise it as a type string. We should match it to JSON datasource and return a string type so schema_of_json returns a proper DDL formatted string.
Let schema_of_json respect dropFieldIfAllNull option during schema inference.
[API][3.1][SPARK-30279][SQL] Support 32 or more grouping attributes for GROUPING_ID (+117, -53)>
Since Spark 3.1, grouping_id() returns long values. In Spark version 3.0 and earlier, this function returns int values. To restore the behavior before Spark 3.0, you can set spark.sql.legacy.integerGroupingId to true.
After the changes, the unnecessary sort won't show up in the explain result.
[API][3.1][SPARK-30681][PYSPARK][SQL] Add higher order functions API to PySpark (+480, -0)>
Currently the higher order functions are available only using SQL and Scala API and can use only SQL expressions
df.selectExpr("transform(values, x -> x + 1)")
This works reasonably well for simple functions, but can get really ugly with complex functions (complex functions, casts), resulting objects are somewhat verbose and we don't get any IDE support. Additionally DSL used, though very simple, is not documented.
With changes proposed here, above query could be rewritten as:
df.select(transform("values", lambdax: x +1))
This PR add Python API for invoking following higher functions:
[API][3.1][SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions (+421, -0)>
Currently the higher order functions are available only using SQL and Scala API and can use only SQL expressions:
select(df, expr("transform(xs, x -> x + 1)")
This is error-prone, and hard to do right, when complex logic is used (when / otherwise, complex objects).
If this PR is accepted, above function could be simply rewritten as:
This PR add R API for invoking following higher functions:
transform -> array_transform (to avoid conflict with base::transform).
exists -> array_exists (to avoid conflict with base::exists).
forall -> array_forall (no conflicts, renamed for consistency)
filter -> array_filter (to avoid conflict with stats::filter).
aggregate -> array_aggregate (to avoid conflict with stats::transform).
zip_with -> arrays_zip_with (no conflicts, renamed for consistency)
[API][3.1][SPARK-30776][ML] Support FValueSelector for continuous features and continuous labels (+758, -13)>
Add FValueRegressionSelector for continuous features and continuous labels.
[3.0][SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a new index (+105, -25)>
Spark uses the class InMemoryStore as the KV storage for live UI and history server(by default if no LevelDB file path is provided). In InMemoryStore, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID. However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks). Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well.
This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap.
[3.1][SPARK-31073][WEBUI] Add "shuffle write time" to task metrics summary in StagePage (+29, -16)>
Added Shuffle Write Time to task metrics summary.
Added checkbox for Shuffle Write Time as an additional metrics.
Renamed Write Time column in task table to Shuffle Write Time and let it as an additional column.
[2.4][SPARK-30968][BUILD] Upgrade aws-java-sdk-sts to 1.11.655 (+1, -1)>
[3.0][SPARK-30994][CORE] Update xerces to 2.12.0 (+8, -2)>