[OSS DIGEST] The major changes of Apache Spark from Feb 26 to Mar 10

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[OSS DIGEST] The major changes of Apache Spark from Feb 26 to Mar 10

Gengliang Wang
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an [API] tag in the title.


CORE

[3.0][SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks (+59, -12)>

When a job finished, its running (re-submitted) map stages should be marked as finished if not used by the other jobs. The running tasks of these stages are cancelled. Also, the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone. This PR fixed the issues.

active_stage

[3.0][SPARK-30969][CORE] Remove resource coordination support from Standalone (+17, -463)>

Resource coordination is mainly designed for the scenario where multiple workers launched on the same host. However, it is, actually, a non-existed scenario for today's Spark. Spark now can start multiple executors in a single Worker, while it only allows one executor per Worker at very beginning. It really helps nothing for users to launch multiple workers on the same host. Thus, it's not worth for us to bring over complicated implementation and potential high maintain cost for such an impossible scenario.

SQL

[2.4][SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq) (+2, -1)>

This PR fixes a thread-safety bug in SparkSession.createDataset(Seq): if the caller-supplied Encoder is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

[2.4][SPARK-30993][SQL] Use its sql type for UDT when checking the type of length (fixed/var) or mutable (+85, -1)>

This patch fixes the bug of UnsafeRow, which does not handle the UDT specifically, in isFixedLength and isMutable. These methods don't check its SQL type for UDT, always treating UDT as variable-length, and non-mutable.

It doesn't bring any issue if UDT is used to represent the complicated type, but when UDT is used to represent some type that is matched with fixed length of SQL type, it exposes the chance of correctness issues, as these informations sometimes decide how the value should be handled.

Misclassifying the type of length for UDT can corrupt the value when the row is presented to the input of GenerateUnsafeRowJoiner, which brings the correctness issue.

[2.4][SPARK-30998][SQL] ClassCastException when a generator having nested inner generators (+40, -3)>

A query below failed;

scala> sql("select array(array(1, 2), array(3)) ar").select(explode(explode($"ar"))).show()
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
	at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313)
	at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    ...

This PR modified the hasNestedGenerator code in ExtractGenerator for correctly catching the nested inner generators.

[3.0]Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" (+43, -67)>

In the Hive module, the server side provides the Date-time values that simply use value.toSting, and the client side regenerates the results back in HiveBaseResultSet with java.sql.Date(Timestamp).valueOf. There will be inconsistency between client and server if we use java8 APIs. Thus, the PR got reverted.

[API][3.0][SPARK-27619][SQL] MapType should be prohibited in hash expressions (+39, -17)>

hash() and xxhash64() cannot be used on elements of Maptype. A new configuration spark.sql.legacy.allowHashOnMapType is introduced to allow users to restore the previous behaviour.

When spark.sql.legacy.allowHashOnMapType is set to false:

scala> spark.sql("select hash(map())");
org.apache.spark.sql.AnalysisException: cannot resolve 'hash(map())' due to data type mismatch: input to function hash cannot contain elements of MapType; line 1 pos 7;
'Project [unresolvedalias(hash(map(), 42), None)]
+- OneRowRelation

when spark.sql.legacy.allowHashOnMapType is set to true :

scala> spark.sql("set spark.sql.legacy.allowHashOnMapType=true");
res3: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("select hash(map())").first()
res4: org.apache.spark.sql.Row = [42]

[API][3.0][SPARK-30902][SQL] Default table provider should be decided by catalog implementations (+84, -55)>

When CREATE TABLE SQL statement does not specify the provider, leave it to the catalog implementations to decide.

[3.0][SPARK-30972][SQL] PruneHiveTablePartitions should be executed as earlyScanPushDownRules (+21, -16)>

Similar to rule PruneFileSourcePartitionsPruneHiveTablePartitions should also be executed as earlyScanPushDownRules to eliminate the impact on statistic computation later.

[3.0][SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions (+34, -0)>

We have supported the generators in SQL aggregate expressions by SPARK-28782, but the generator(explode) query with the aggregate functions in DataFrame failed.

The root cause is that ExtractGenerator wrongly replaces a project w/ aggregate functions before GlobalAggregates replaces it with an aggregate.

To avoid the case in ExtractGenerator, this PR adds a condition to ignore generators having the aggregate functions.

[API][3.0][SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces (+115, -7)>

Currently, Spark parses interval from multi units strings or from date-time/year-month pattern strings. The former handles all whitespace, while the latter doesn't. After changes, Spark handles spaces for both date-time and year-month pattern strings.

select interval '\n-\t10\t 12:34:46.789\t' day to second
-- !query 126 schema
struct<INTERVAL '-10 days -12 hours -34 minutes -46.789 seconds':interval>
-- !query 126 output
-10 days -12 hours -34 minutes -46.789 seconds

[3.0][SPARK-30563][SQL] Disable using commit coordinator with NoopDataSource (+1, -0)>

This PR disables using the commit coordinator with NoopDataSource. There is no need for a coordinator in benchmarks.

[API][3.0][SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter (+269, -96)>

Add a new SQL conf spark.sql.legacy.timeParserPolicy:

  • When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0.
  • When set to CORRECTED, classes from java.time.* packages are used for the same purpose.
  • The default value is EXCEPTION, RuntimeException is thrown when we will get different results.

[3.0][SPARK-30885][SQL][FOLLOW-UP] Fix issues where some V1 commands allow tables that are not fully qualified (+65, -42)>

There are few V1 commands such as REFRESH TABLE that still allow spark_catalog.t because they run the commands with parsed table names without trying to load them in the catalog. This PR addresses this issue.

[3.0][SPARK-30886][SQL] Deprecate two-parameter TRIM/LTRIM/RTRIM functions (+66, -6)>

This PR aims to show a deprecation warning on two-parameter TRIM/LTRIM/RTRIM function usages based on the community decision: https://lists.apache.org/thread.html/r48b6c2596ab06206b7b7fd4bbafd4099dccd4e2cf9801aaa9034c418%40%3Cdev.spark.apache.org%3E

[3.0][SPARK-30899][SQL] CreateArray/CreateMap's data type should not depend on SQLConf.get (+45, -18)>

This allows to avoid the issue when the configuration change between different phases of planning, and this can silently break a query plan which can lead to crashes or data corruption.

[3.0][SPARK-30960][SQL] add back the legacy date/timestamp format support in CSV/JSON parser (+222, -5)>

Before Spark 3.0, the JSON/CSV parser has a special behavior that, when the parser fails to parse a timestamp/date, fallback to another way to parse it, to support some legacy format. The fallback was removed by https://issues.apache.org/jira/browse/SPARK-26178 and https://issues.apache.org/jira/browse/SPARK-26243.

This PR adds back this legacy fallback. Since we switch the API to do datetime operations, we can't be exactly the same as before. Here we add back the support of the legacy formats that are common (examples of Spark 2.4):

  1. the fields can have one or two letters
scala> sql("""select from_json('{"time":"1123-2-22 2:22:22"}', 'time Timestamp')""").show(false)
+-------------------------------------------+
|jsontostructs({"time":"1123-2-22 2:22:22"})|
+-------------------------------------------+
|[1123-02-22 02:22:22]                      |
+-------------------------------------------+
  1. the separator between data and time can be "T" as well
scala> sql("""select from_json('{"time":"2000-12-12T12:12:12"}', 'time Timestamp')""").show(false)
+---------------------------------------------+
|jsontostructs({"time":"2000-12-12T12:12:12"})|
+---------------------------------------------+
|[2000-12-12 12:12:12]                        |
+---------------------------------------------+
  1. the second fraction can be arbitrary length
scala> sql("""select from_json('{"time":"1123-02-22T02:22:22.123456789123"}', 'time Timestamp')""").show(false)
+----------------------------------------------------------+
|jsontostructs({"time":"1123-02-22T02:22:22.123456789123"})|
+----------------------------------------------------------+
|[1123-02-15 02:22:22.123]                                 |
+----------------------------------------------------------+
  1. date string can end up with any chars after "T" or space
scala> sql("""select from_json('{"time":"1123-02-22Tabc"}', 'time date')""").show(false)
+----------------------------------------+
|jsontostructs({"time":"1123-02-22Tabc"})|
+----------------------------------------+
|[1123-02-22]                            |
+----------------------------------------+
  1. remove "GMT" from the string before parsing
scala> sql("""select from_json('{"time":"GMT1123-2-22 2:22:22.123"}', 'time Timestamp')""").show(false)
+--------------------------------------------------+
|jsontostructs({"time":"GMT1123-2-22 2:22:22.123"})|
+--------------------------------------------------+
|[1123-02-22 02:22:22.123]                         |
+--------------------------------------------------+

[API][3.0][SPARK-31005][SQL] Support time zone ids in casting strings to timestamps (+48, -32)>

This PR changes DateTimeUtils.stringToTimestamp to support any valid time zone id at the end of input string. After the changes, the function accepts zone ids in the formats:

  • no zone id. In that case, the function uses the local session time zone from the SQL config spark.sql.session.timeZone
  • -[h]h:[m]m
  • +[h]h:[m]m
  • Z
  • Short zone id, see https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html#SHORT_IDS
  • Zone ID starts with 'UTC+', 'UTC-', 'GMT+', 'GMT-', 'UT+' or 'UT-'. The ID is split in two, with a two or three letter prefix and a suffix starting with the sign. The suffix must be in the formats:
    • +|-h[h]
    • +|-hh[:]mm
    • +|-hh:mm:ss
    • +|-hhmmss
  • Region-based zone IDs in the form {area}/{city}, such as Europe/Paris or America/New_York. The default set of region ids is supplied by the IANA Time Zone Database (TZDB).

[3.0][SPARK-31010][SQL][FOLLOW-UP] Deprecate untyped scala UDF (+2, -0)>

Use scala annotation deprecate to deprecate untyped scala UDF.

[API][3.0][SPARK-31019][SQL] make it clear that people can deduplicate map keys (+112, -54)>

Rename the conf spark.sql.legacy.allowDuplicatedMapKeys to spark.sql.mapKeyDedupPolicy and make it public.

[API][3.0][SPARK-31024][SQL] Allow specifying session catalog name spark_catalog in qualified column names for v1 tables (+111, -59)>

Currently, the user cannot specify the session catalog name (spark_catalog) in qualified column names for v1 tables:

SELECT spark_catalog.default.t.i FROM spark_catalog.default.t

fails with cannot resolve 'spark_catalog.default.t.i.

This is inconsistent with v2 table behavior where catalog name can be used:

SELECT testcat.ns1.tbl.id FROM testcat.ns1.tbl.id

This PR proposes to fix the inconsistency and allow the user to specify session catalog name in column names for v1 tables.

[3.0][SPARK-31036][SQL] Use stringArgs in Expression.toString to respect hidden parameters (+3, -1)>

This PR proposes to respect hidden parameters by using stringArgs in Expression.toString . By this, we can show the strings properly in some cases such as NonSQLExpression.

[API][3.0][SPARK-31037][SQL] refine AQE config names (+121, -112)>

This PR refines the AQE related config names(starting with spark.sql.adaptive):

  1. remove the "shuffle" prefix. AQE is all about shuffle and we don't need to add the "shuffle" prefix everywhere.
  2. targetPostShuffleInputSize is obscure, rename to advisoryShufflePartitionSizeInBytes.
  3. reducePostShufflePartitions doesn't match the actual optimization, rename to coalesceShufflePartitions
  4. minNumPostShufflePartitions is obscure, rename it minPartitionNum under the coalesceShufflePartitions namespace
  5. maxNumPostShufflePartitions is confusing with the word "max", rename it initialPartitionNum
  6. skewedJoinOptimization is too verbose. skew join is a well-known terminology in database area, we can just say skewJoin

[3.0][SPARK-31038][SQL] Add checkValue for spark.sql.session.timeZone (+27, -0)>

The spark.sql.session.timeZone config can accept any string value including invalid time zone ids, then it will fail other queries that rely on the time zone. We should do the value checking in the set phase and fail fast if the zone value is invalid.

[3.0][SPARK-31053][SQL] mark connector APIs as Evolving (+48, -40)>

This PR makes it consistent and mark all Connector APIs as Evolving.

[API][3.0][SPARK-31061][SQL] Provide ability to alter the provider of a table (+49, -1)>

This PR adds functionality to HiveExternalCatalog to be able to change the provider of a table.

This is useful for catalogs in Spark 3.0 to be able to use alterTable to change the provider of a table as part of an atomic REPLACE TABLE function.

[API][3.0][SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source (+54, -8)>

This PR proposes two things:

  1. Convert null to string type during schema inference of schema_of_json as JSON datasource does. This is a bug fix as well because null string is not the proper DDL formatted string and it is unable for SQL parser to recognise it as a type string. We should match it to JSON datasource and return a string type so schema_of_json returns a proper DDL formatted string.

  2. Let schema_of_json respect dropFieldIfAllNull option during schema inference.

[API][3.1][SPARK-30279][SQL] Support 32 or more grouping attributes for GROUPING_ID (+117, -53)>

Since Spark 3.1, grouping_id() returns long values. In Spark version 3.0 and earlier, this function returns int values. To restore the behavior before Spark 3.0, you can set spark.sql.legacy.integerGroupingId to true.

This pr intends to support 32 or more grouping attributes for GROUPING_ID. In the current master, an integer overflow can occur to compute grouping IDs; https://github.com/apache/spark/blob/e75d9afb2f282ce79c9fd8bce031287739326a4f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L613

For example, the query below generates wrong grouping IDs in the master;


scala> val numCols = 32 // or, 31
scala> val cols = (0 until numCols).map { i => s"c$i" }
scala> sql(s"create table test_$numCols (${cols.map(c => s"$c int").mkString(",")}, v int) using parquet")
scala> val insertVals = (0 until numCols).map { _ => 1 }.mkString(",")
scala> sql(s"insert into test_$numCols values ($insertVals,3)")
scala> sql(s"select grouping_id(), sum(v) from test_$numCols group by grouping sets ((${cols.mkString(",")}), (${cols.init.mkString(",")}))").show(10, false)
scala> sql(s"drop table test_$numCols")

// numCols = 32
+-------------+------+
|grouping_id()|sum(v)|
+-------------+------+
|0            |3     |
|0            |3     | // Wrong Grouping ID
+-------------+------+

// numCols = 31
+-------------+------+
|grouping_id()|sum(v)|
+-------------+------+
|0            |3     |
|1            |3     |
+-------------+------+

To fix this issue, this pr change code to use long values for GROUPING_ID instead of int values.

[3.1][SPARK-31078][SQL] Respect aliases in output ordering (+87, -23)>

Currently, in the following scenario, an unnecessary Sort node is introduced:

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
  val df = (0 until 20).toDF("i").as("df")
  df.repartition(8, df("i")).write.format("parquet")
    .bucketBy(8, "i").sortBy("i").saveAsTable("t")
  val t1 = spark.table("t")
  val t2 = t1.selectExpr("i as ii")
  t1.join(t2, t1("i") === t2("ii")).explain
}

After the changes, the unnecessary sort won't show up in the explain result.

PYTHON

[API][3.1][SPARK-30681][PYSPARK][SQL] Add higher order functions API to PySpark (+480, -0)>

Currently the higher order functions are available only using SQL and Scala API and can use only SQL expressions

df.selectExpr("transform(values, x -> x + 1)")

This works reasonably well for simple functions, but can get really ugly with complex functions (complex functions, casts), resulting objects are somewhat verbose and we don't get any IDE support. Additionally DSL used, though very simple, is not documented.

With changes proposed here, above query could be rewritten as:

df.select(transform("values", lambda x: x + 1))

This PR add Python API for invoking following higher functions:

  • transform
  • exists
  • forall
  • filter
  • aggregate
  • zip_with
  • transform_keys
  • transform_values
  • map_filter
  • map_zip_with

R

[API][3.1][SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions (+421, -0)>

Currently the higher order functions are available only using SQL and Scala API and can use only SQL expressions:

select(df, expr("transform(xs, x -> x + 1)")

This is error-prone, and hard to do right, when complex logic is used (when / otherwise, complex objects).

If this PR is accepted, above function could be simply rewritten as:

select(df, array_transform("xs", function(x) x + 1))

This PR add R API for invoking following higher functions:

  • transform -> array_transform (to avoid conflict with base::transform).
  • exists -> array_exists (to avoid conflict with base::exists).
  • forall -> array_forall (no conflicts, renamed for consistency)
  • filter -> array_filter (to avoid conflict with stats::filter).
  • aggregate -> array_aggregate (to avoid conflict with stats::transform).
  • zip_with -> arrays_zip_with (no conflicts, renamed for consistency)
  • transform_keys
  • transform_values
  • map_filter
  • map_zip_with

ML

[API][3.1][SPARK-30776][ML] Support FValueSelector for continuous features and continuous labels (+758, -13)>

Add FValueRegressionSelector for continuous features and continuous labels.

UI

[3.0][SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a new index (+105, -25)>

Spark uses the class InMemoryStore as the KV storage for live UI and history server(by default if no LevelDB file path is provided). In InMemoryStore, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID. However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks). Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well.

This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap.

[3.1][SPARK-31073][WEBUI] Add "shuffle write time" to task metrics summary in StagePage (+29, -16)>

  1. Added Shuffle Write Time to task metrics summary.
  2. Added checkbox for Shuffle Write Time as an additional metrics.
  3. Renamed Write Time column in task table to Shuffle Write Time and let it as an additional column.

additional-metrics-after

BUILD

[2.4][SPARK-30968][BUILD] Upgrade aws-java-sdk-sts to 1.11.655 (+1, -1)>

[3.0][SPARK-30994][CORE] Update xerces to 2.12.0 (+8, -2)>