[OSS DIGEST] The major changes of Apache Spark from July 1 to July 28

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[OSS DIGEST] The major changes of Apache Spark from July 1 to July 28

Yuanjian Li

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team (this time we combined 4 weeks).
For each API/configuration/behavior change, an [API] tag is added in the title.
All the content has been appended to this doc. It will be updated when we send the email.


[SPARK-32091][CORE] Ignore timeout error when removing blocks on the lost executor (+191, -19)>

This PR adds the check to see whether the executor is lost (by asking the CoarseGrainedSchedulerBackend) after a timeout error raised in BlockManagerMasterEndpoint due to removing blocks (e.g. RDD, broadcast, shuffle). If the executor is lost, we will ignore the error. Otherwise, throw the error.

This makes Spark more robust against the executor lost events. For example, the events are triggered due to Spot instance termination.

[SPARK-32103][CORE] Fix the code to split host and port in the string (+142, -19)>

The current code to split host and port in the string is incorrect in the IPv6 scenario.

[API][SPARK-29292][SPARK-30010][CORE] Let core compile for Scala 2.13 (+119, -122)>

Fix the core module so that it can compile with Scala 2.13. It's a step towards the Scala 2.13 support.

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors (+141, -4)>

Currently when an executor is decommissioned, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers give different timeouts before they take away the nodes. For example in case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Added a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

  • spark.executor.decommission.killInterval (Default: None)
    Duration after which a decommissioned executor will be killed forcefully. This config is useful for cloud environments where we know in advance when an executor is going to go down after decommissioning signal i.e. around 2 mins in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently used to decide what tasks running on decommission executors to speculate.

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown (+1150, -255)>

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

Adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

Introduces two new configs parameters, spark.storage.decommission.shuffleBlocks.enabled & spark.storage.decommission.rddBlocks.enabled that control which blocks should be migrated during storage decommissioning.

  • spark.storage.decommission.shuffleBlocks.enabled (Default: false)
    Whether to transfer shuffle blocks during block manager decommissioning. Requires a migratable shuffle resolver (like sort based shuffe)

  • spark.storage.decommission.rddBlocks.enabled (Default: false)
    Whether to transfer RDD blocks during block manager decommissioning.

[SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI (+190, -5)>

Allows an external agent to inform the Master that certain hosts are being decommissioned.

The current decommissioning is triggered by the Worker getting a SIGPWR (out of band possibly by some cleanup hook), which then informs the Master about it. This approach may not be feasible in some environments that cannot trigger a clean up hook on the Worker. In addition, when a large number of worker nodes are being decommissioned then the master will get a flood of messages.

[SPARK-32350][CORE] Add batch-write on LevelDB to improve performance of HybridStore (+66, -16)>

Improve the performance of HybridStore by adding batch write support to LevelDB

[SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost

If an executor is lost, the DAGScheduler handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the DAGScheduler again removes the executor and by right, should unregister its outputs.

Without the changes, the loss of a node could require two stage attempts to recover instead of one.

[SPARK-32437][CORE] Improve MapStatus deserialization speed with RoaringBitmap 0.9.0 (+57, -57)>

This PR aims to speed up MapStatus deserialization by 5~18% with the upgrading RoaringBitmap from 0.7.45 to 0.9.0 and new APIs.

[API][SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory (+102, -15)>

Support set off heap memory in ExecutorResourceRequests

[API][SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature (+392, -19)>

When dynamic allocation is enabled, if there is an unschedulable taskset due to blacklisting, Spark can request more executors needed to schedule the unschedulable blacklisted tasks instead of aborting immediately. In this case, the scheduler will pass an event SparkListenerUnschedulableTaskSetAdded to ExecutorAllocationManager for more executors. Once the event is sent, we start the abortTimer similar to SPARK-22148 SPARK-15815 to abort in the case when no new executors launched either due to max executors reached or cluster manager is out of capacity.

This is an improvement for Spark's Fault tolerance.


[SPARK-28169][SQL] Convert scan predicate condition to CNF (+152, -27)>

This PR optimizes the partition pruning by pushing down predicate connected with OR. See the following example:

select * from test where dt = 20190626 or (dt = 20190627 and xxx="a")

The where condition will be converted to CNF: (dt = 20190626 or dt = 20190627) and (dt = 20190626 or xxx = "a" ) so that the condition dt = 20190626 or dt = 20190627 can be push down when partition pruning.

[API][SPARK-31797][SQL] TIMESTAMP_SECONDS supports fractional input (+267, -39)>

This PR adds the fractional input support for TIMESTAMP_SECONDS:


  2008-12-25 07:30:00.123

[API][SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options (+59, -45)>

This PR fixes the following issues related to data source options:

  • should consider data source options when refreshing cache by the path at the end of InsertIntoHadoopFsRelationCommand

  • should consider data source options when inferring schema for the file source

  • should consider data source options when getting the qualified path in file source v2.

[API][SPARK-32130][SQL] Disable the JSON option inferTimestamp by default (+130, -112)>

To prevent the Spark 3.0 perf regression while inferring schemas from JSON with potential timestamps fields, this PR sets the JSON option inferTimestamp to false as default.

[SPARK-32136][SQL] NormalizeFloatingNumbers should work on null struct (+15, -2)>

Before the fix, NormalizeFloatingNumbers reconstructs a struct if input expression is StructType. If the input struct is null, it will reconstruct a struct with null-value fields, instead of null. This will cause a correctness bug if we use a null-value struct as a grouping key. For example,

case class B(c: Option[Double])
case class A(b: Option[B])
val df = Seq(
val res = df.groupBy("b").agg(count("*"))

Before the fix

> res.show
|    b|count(1)|
|   []|       2|
|[1.0]|       1|

> res.collect.foreach(println)

After this fix, the result is as expected, which is same with Spark version 2.4 and earlier:

> res.show
|    b|count(1)|
|   []|       1|
| null|       1|
|[1.0]|       1|

> res.collect.foreach(println)

[SPARK-32145][SQL] ThriftCLIService.GetOperationStatus should include exception's stack trace to the error message (+106, -147)>

This PR makes a change in JDBC clients to give the full stack traces instead of only the message of the root cause the full stack traces instead of only the message of the root cause. See the following example: For JDBC end-users, the message

Error running query: org.apache.spark.sql.AnalysisException: The second argument of 'date_sub' function needs to be an integer.;

is better than just seeing the root cause:

Caused by: java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2

[SPARK-32167][SQL] Fix GetArrayStructFields to respect inner field's nullability together (+42, -5)>

Result for the following example should be Array([WrappedArray(1, null)]) instead of Array([WrappedArray(1, 0)]):

val innerStruct = new StructType().add("i", "int", nullable = true)
val schema = new StructType().add("arr", ArrayType(innerStruct, containsNull = false))
val df = spark.createDataFrame(List(Row(Seq(Row(1), Row(null)))).asJava, schema)

This PR fixes this correctness bug in getField by considering both the original array's containsNull and the inner field's nullability in GetArrayStructFields.

[API][SPARK-31317][SQL] Add withField method to Column (+815, -3)>

This PR introduces a new withField method to the Column class. This method should allow users to add or replace a StructField in a StructType column (with very similar semantics to the withColumn method on Dataset).

def withField(fieldName: String, col: Column): Column

See the examples of usage:

   val df = sql("SELECT named_struct('a', 1, 'b', 2) struct_col")
   df.select($"struct_col".withField("c", lit(3)))
   // result: {"a":1,"b":2,"c":3}

   val df = sql("SELECT named_struct('a', 1, 'b', 2) struct_col")
   df.select($"struct_col".withField("b", lit(3)))
   // result: {"a":1,"b":3}

   val df = sql("SELECT CAST(NULL AS struct<a:int,b:int>) struct_col")
   df.select($"struct_col".withField("c", lit(3)))
   // result: null of type struct<a:int,b:int,c:int>

   val df = sql("SELECT named_struct('a', 1, 'b', 2, 'b', 3) struct_col")
   df.select($"struct_col".withField("b", lit(100)))
   // result: {"a":1,"b":100,"b":100}

   val df = sql("SELECT named_struct('a', named_struct('a', 1, 'b', 2)) struct_col")
   df.select($"struct_col".withField("a.c", lit(3)))
   // result: {"a":{"a":1,"b":2,"c":3}}

   val df = sql("SELECT named_struct('a', named_struct('b', 1), 'a', named_struct('c', 2)) struct_col")
   df.select($"struct_col".withField("a.c", lit(3)))
   // result: org.apache.spark.sql.AnalysisException: Ambiguous reference to fields

[SPARK-32163][SQL] Nested pruning should work even with cosmetic variations (+28, -3)>

If the expressions extracting nested fields have cosmetic variations like qualifier difference, currently nested column pruning does not work well. That means, two attributes in the same query, are semantically the same, but their nested column extractors are treated differently in nested column pruning.

This patch proposes to deal with cosmetic variations when processing nested column extractors in NestedColumnAliasing.

[API][SPARK-20680][SQL] Spark SQL does not support creating table with void column datatype (+191, -14)>

This is a new feature that:

  1. Supports parsing void type as Spark SQL's NullType
  2. Forbids creating tables with VOID/NULL-typed columns

This improves compatibility between Spark SQL and Hive.

  1. Spark is incompatible with Hive's VOID type. When the Hive table schema contains the VOID type, DESC table will throw an exception in Spark.
>hive> create table bad as select 1 x, null z from dual;
>hive> describe bad;
x	int
z	void

In Spark 2.0.x, users can read this view:

>spark-sql> describe bad;
x       int     NULL
z       void    NULL
Time taken: 4.431 seconds, Fetched 2 row(s)

But in latest Spark version (3.0.x), it fails with SparkException: Cannot recognize hive type string: void

>spark-sql> describe bad;
17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
org.apache.spark.SparkException: Cannot recognize hive type string: void
Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
DataType void() is not supported.(line 1, pos 0)
== SQL ==
        ... 61 more
org.apache.spark.SparkException: Cannot recognize hive type string: void
  1. Hive CTAS statements throw an error when the select clause has VOID/NULL type column since HIVE-11217 In Spark, creating table with a VOID/NULL column should throw a readable exception message, including
  • Create data source table (using parquet, json, ...)
  • Create Hive table (with or without stored as)
  • CTAS

[API][SPARK-32168][SQL] Fix hidden partitioning correctness bug in SQL overwrite (+107, -37)>

This PR fixes a correctness bug in the Data Source V2 APIs. If a table has hidden partitions, all of the values for those partitions are dropped instead of dynamically overwriting changed partitions.

This only affects SQL commands (not DataFrameWriter) writing to tables that have hidden partitions. It is also only a problem when the partition overwrite mode is dynamic.

When converting an INSERT OVERWRITE query to a v2 overwrite plan, Spark attempts to detect when a dynamic overwrite and a static overwrite will produce the same result so it can use the static overwrite. Spark incorrectly detects when dynamic and static overwrites are equivalent when there are hidden partitions, such as days(ts).

This updates the analyzer rule ResolveInsertInto to always use a dynamic overwrite when the mode is dynamic, and static when the mode is static. This avoids the problem by not trying to determine whether the two plans are equivalent and always using the one that corresponds to the partition overwrite mode.

[API][SPARK-31875][SQL] Provide an option to disable user supplied hints (+61, -0)>

New configuration. Introduce a new SQL config spark.sql.optimizer.ignoreHints. When this is set to true, application of hints are disabled. This can be helpful to study the impact of performance difference when hints are applied vs when they are not.

[API][SPARK-32207][SQL] Support 'F'-suffixed Float Literals (+108, -2)>

Adds a float literal SQL syntax, e.g. select 1.1f.

[SPARK-32256][SQL][TEST-HADOOP2.7] Force to initialize Hadoop VersionInfo in HiveExternalCatalog (+78, -1)>

This is a regression in Spark 3.0.0 because we switched the default Hive execution version from 1.2.1 to 2.3.7. This PR forces to initialize Hadoop VersionInfo in HiveExternalCatalog to make sure Hive can get the Hadoop version when using the isolated classloader.

[SPARK-32220][SQL] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result (+29, -2)>

In current Join Hint strategies, if we use SHUFFLE_REPLICATE_NL hint, it will directly convert the join to Cartesian Product Join and lose join condition, making the result not correct. This PR fixes the mistake.

[API][SPARK-32133][SQL] Forbid time field steps for date start/end in Sequence (+27, -0)>

The sequence function with time field steps for date start/end looks strange. It may produce several values that are the same date, then suddenly jump to the next day. Presto also forbids it. For example, the following example will not be allowed by Spark too.

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-03-02' as date), interval 1 hour))").head(3)
res0: Array[org.apache.spark.sql.Row] = _Array([2011-03-01], [2011-03-01], [2011-03-01])_ **<- strange result.**

[API][SPARK-29358][SQL] Make unionByName optionally fill missing columns with nulls (+91, -5)>

Currently, unionByName throws an exception if detecting different column names between two Datasets. It is a strict requirement and sometimes users require more flexible usage that two Datasets with different subsets of columns can be union by name resolution. This PR adds an overload of Dataset.unionByName, which takes an allowMissingColumns flag. This is a new Scala API:

   * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
   * The difference between this function and [[union]] is that this function
   * resolves columns by name (not by position).
   * When the parameter `allowMissingColumns` is true, this function allows different set
   * of column names between two Datasets. Missing columns at each side, will be filled with
   * null values. The missing columns at left Dataset will be added at the end in the schema
   * of the union result:
   * {{{
   *   val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
   *   val df2 = Seq((4, 5, 6)).toDF("col1", "col0", "col3")
   *   df1.unionByName(df2, true).show
   *   // output: "col3" is missing at left df1 and added at the end of schema.
   *   // +----+----+----+----+
   *   // |col0|col1|col2|col3|
   *   // +----+----+----+----+
   *   // |   1|   2|   3|null|
   *   // |   5|   4|null|   6|
   *   // +----+----+----+----+
   *   df2.unionByName(df1, true).show
   *   // output: "col2" is missing at left df2 and added at the end of schema.
   *   // +----+----+----+----+
   *   // |col1|col0|col3|col2|
   *   // +----+----+----+----+
   *   // |   4|   5|   6|null|
   *   // |   2|   1|null|   3|
   *   // +----+----+----+----+
   * }}}
   * @group typedrel
   * @since 3.1.0
  def unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T] 

[API][SPARK-32154][SQL] Use ExpressionEncoder for the return type of ScalaUDF to convert to catalyst type (+235, -94)>

This PR proposes to use ExpressionEncoder for the return type of ScalaUDF to convert to the catalyst type, instead of using CatalystTypeConverters. Then the UDF can return java 8 datetime classes no matter the spark.sql.datetime.java8API.enabled is true or not.

[API][SPARK-32270][SQL] Use TextFileFormat in CSV's schema inference with a different encoding (+14, -12)>

This PR proposes to use text datasource in CSV's schema inference. This makes it consistent that schema inference and actual reading use the same encoding to read files.

[SPARK-32241][SQL] Remove empty children of union (+65, -5)>

This PR removes the empty child relations of a Union, to simplify the plan.

[API][8.0][SPARK-31480][SQL] Improve the EXPLAIN FORMATTED's output for DSV2's Scan Node (+163, -15)>

Improve the EXPLAIN FORMATTED output of DSV2 Scan nodes (file based ones).


== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- BatchScan (1)

(1) BatchScan
Output [2]: [value#7, id#8]
Arguments: [value#7, id#8], ParquetScan(org.apache.spark.sql.test.TestSparkSession17477bbb,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml,org.apache.spark.sql.execution.datasources.InMemoryFileIndexa6c363ce,StructType(StructField(value,IntegerType,true)),StructType(StructField(value,IntegerType,true)),StructType(StructField(id,IntegerType,true)),[Lorg.apache.spark.sql.sources.Filter;40fee459,org.apache.spark.sql.util.CaseInsensitiveStringMapfeca1ec6,Vector(isnotnull(id#8), (id#8 > 1)),List(isnotnull(value#7), (value#7 > 2)))
(2) ...
(3) ...
(4) ...


== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- BatchScan (1)

(1) BatchScan
Output [2]: [value#7, id#8]
DataFilters: [isnotnull(value#7), (value#7 > 2)]
Format: parquet
Location: InMemoryFileIndex[....]
PartitionFilters: [isnotnull(id#8), (id#8 > 1)]
PushedFilers: [IsNotNull(id), IsNotNull(value), GreaterThan(id,1), GreaterThan(value,2)]
ReadSchema: struct<value:int>
(2) ...
(3) ...
(4) ...

[API][SPARK-32272][SQL] Add SQL standard command SET TIME ZONE (+308, -6)>

Adds the SQL standard command - SET TIME ZONE to the current default time zone displacement for the current SQL-session, which is the same as the existing `set spark.sql.session.timeZone=xxx'.

All in all, this PR adds syntax as following,

SET TIME ZONE 'valid time zone';  -- zone offset or region
SET TIME ZONE INTERVAL XXXX; -- xxx must in [-18, + 18] hours, * this range is bigger than ansi  [-14, + 14] 

[SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables (+78, -22)>

Selecting the ORC tables of the data created by Hive or of columns which names contain only _colX is failing with the error java.lang.ArrayIndexOutOfBoundsException.

For ORC files written by the old versions of Hive, no field name is stored in the physical schema. Thus, we need to send the entire data schema instead of the required schema. In this case, column pruning is not enabled.

For example -

val u = """select date_dim. d_year from date_dim limit 5"""


Here the value of index(d_year) is 6, whereas the resultSchema passed is having only one struct<d_year:int>

So now on using the index value 6 in the resultSchema schema which is having size 1 is giving the exception

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2,, executor driver): java.lang.ArrayIndexOutOfBoundsException: 6
    at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initBatch(OrcColumnarBatchReader.java:156)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$7(OrcFileFormat.scala:258) 

[API][SPARK-30648][SQL] Support filter pushdown in JSON datasource (+1029, -390)>

Support filter pushdown in JSON datasource. The benefit of pushing a filter to JacksonParser is to apply the filter as early as all its attributes become available i.e. converted from JSON field values to desired values according to the schema. This can skip parsing of the rest of JSON record and conversions of other values if the filter returns false. This can improve the performance when the pushed filters are highly selective and conversion of JSON string fields to desired values are comparably expensive ( for example, the conversion to TIMESTAMP values). In JsonBenchmark test case, the changes improve performance on synthetic benchmarks up to 33.5 times on JDK 8:

OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Filters pushdown:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
w/o filters                                       30314          30334          28          0.0      303139.1       1.0X
pushdown disabled                                 30394          30429          54          0.0      303944.7       1.0X
w/ filters                                          906            913           8          0.1        9059.1      33.5X

The JSON filter pushdown can be controlled by the following configuration:

  • spark.sql.json.filterPushdown.enabled (default: true)

    When true, enable filter pushdown to JSON datasource.

[API][SPARK-30616][SQL] Introduce TTL config option for SQL Metadata Cache (+107, -10)>

New spark.sql.metadataCacheTTLSeconds option that adds time-to-live cache behaviour to the existing caches in FileStatusCache and SessionCatalog.

Currently Spark caches file listing for tables and requires issuing REFRESH TABLE any time the file listing has changed outside of Spark. Unfortunately, simply submitting REFRESH TABLE commands could be very cumbersome. Assuming frequently added files, hundreds of tables and dozens of users querying the data (and expecting up-to-date results), manually refreshing metadata for each table is not a solution.

This is a pretty common use-case for streaming ingestion of data, which can be done outside of Spark (with tools like Kafka Connect, etc.).

  • spark.sql.metadataCacheTTLSeconds (Default: -1)

    Time-to-live (TTL) value for the metadata caches: partition file metadata cache and session catalog cache. This configuration only has an effect when this value has a positive value (> 0). It also requires setting spark.sql.catalogImplementation to hive, setting spark.sql.hive.filesourcePartitionFileCacheSize > 0 and setting spark.sql.hive.manageFilesourcePartitions to true to be applied to the partition file metadata cache.

[SPARK-32344][SQL] Unevaluable expr is set to FIRST/LAST ignoreNullsExpr in distinct aggregates (+89, -63)>

Fix a bug of distinct FIRST/LAST aggregates in v2.4.6/v3.0.0/master;

scala> sql("SELECT FIRST(DISTINCT v) FROM VALUES 1, 2, 3 t(v)").show()
Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: false#37
  at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:258)
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:226)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.ignoreNulls(First.scala:68)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions$lzycompute(First.scala:82)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions(First.scala:81)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$15.apply(HashAggregateExec.scala:268)

[SPARK-32276][SQL] Remove redundant sorts before repartition nodes (+206, -6)>

Remove redundant sorts before repartition nodes whenever the data is ordered after the repartitioning.

The EliminateSorts rule can be extended further to remove sorts before repartition nodes that don't affect the final output ordering. It seems safe to perform the following rewrites:

  • Sort -> Repartition -> Sort -> Scan as Sort -> Repartition -> Scan
  • Sort -> Repartition -> Project -> Sort -> Scan as Sort -> Repartition -> Project -> Scan

[SPARK-32302][SQL] Partially push down disjunctive predicates through Join/Partitions (+208, -324)>

The CNF conversion is used to push down disjunctive predicates through join and partitions pruning.

Essentially, we just need to traverse predicates and extract the convertible sub-predicates for pushdown. There is no need to maintain the CNF result set.

[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning (+322, -16)>

Introduce output partitioning for the build side for BroadcastHashJoinExec if the streamed side has a HashPartitioning or a collection of HashPartitionings.

Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. However, if the join type of BroadcastHashJoinExec is an inner-like join, the build side's info (the join keys) can be added to BroadcastHashJoinExec's outputPartitioning.

There is a new internal config spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit.

  • spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit (Default 8)

    The maximum number of partitionings that a HashPartitioning can be expanded to. This configuration is applicable only for BroadcastHashJoin inner joins and can be set to '0' to disable this feature.

[SPARK-32330][SQL] Preserve shuffled hash join build side partitioning (+66, -19)>

Override outputPartitioning method in ShuffledHashJoinExec, similar to SortMergeJoinExec, to avoid shuffle (for queries having multiple joins or group-by), for saving CPU and IO.

Currently ShuffledHashJoin.outputPartitioning inherits from HashJoin.outputPartitioning, which only preserves stream side partitioning (HashJoin.scala):

override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning

This loses the build side partitioning information, and causes extra shuffle if there's another join / group-by after this join.

[API][SPARK-32338][SQL] Overload slice to accept Column for start and length (+18, -2)>

Add an overload for the slice function that can accept Columns for the start and length parameters.

   * Returns an array containing all the elements in `x` from index `start` (or starting from the
   * end if `start` is negative) with the specified `length`.
   * @param x the array column to be sliced
   * @param start the starting index
   * @param length the length of the slice
   * @group collection_funcs
   * @since 3.1.0
  def slice(x: Column, start: Column, length: Column): Column

Allow users to take slices of arrays based on the length of the arrays, or via data in other columns.

df.select(slice(x, 4, size(x) - 4))

[API][SPARK-32377][SQL] CaseInsensitiveMap should be deterministic for addition (+10, -2)>

Fix CaseInsensitiveMap to be deterministic for addition.

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
var m = CaseInsensitiveMap(Map.empty[String, String])
Seq(("paTh", "1"), ("PATH", "2"), ("Path", "3"), ("patH", "4"), ("path", "5")).foreach { kv =>
  m = (m + kv).asInstanceOf[CaseInsensitiveMap[String]]





[API][SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable (+309, -178)>

Avoid shuffle for joining different bucketed tables. This is also useful for shuffled hash join. In production, we are seeing users use the shuffled hash join to join bucketed tables (set spark.sql.join.preferSortMergeJoin=false, to avoid sort), and this can help avoid shuffle if the number of buckets are not the same.

Performance number for a modified version of TPCDS q93 as

SELECT ss_ticket_number, sr_ticket_number
FROM store_sales
JOIN store_returns
ON ss_ticket_number = sr_ticket_number

where store_sales and store_returns are changed to be bucketed tables.

Runtime improvement can reduce by almost 50% of wall clock time:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
shuffle hash join coalesce bucket off              1541           1664         106          1.9         535.1       1.0X
shuffle hash join coalesce bucket on               1060           1169          81          2.7         368.1       1.5X

[API][SPARK-32364][SQL] Use CaseInsensitiveMap for DataFrameReader/Writer options (+42, -10)>

When a user have multiple options like path, paTH, and PATH for the same key path, option/options is non-deterministic because extraOptions is HashMap. This PR aims to use CaseInsensitiveMap instead of HashMap to fix this bug fundamentally.

[API][SPARK-31999][SQL] Add REFRESH FUNCTION command (+265, -32)>


REFRESH FUNCTION function_identifier


REFRESH FUNCTION statement invalidates the cached function entry, which includes a class name and resource location of the given function. The invalidated cache is populated right away. Note that REFRESH FUNCTION only works for permanent functions. Refreshing native functions or temporary functions will cause an exception.

[SPARK-32237][SQL] Resolve hint in CTE (+41, -6)>

This PR is to move Substitution rule before Hints rule in Analyzer to make hint working in CTE.

[SPARK-32280][SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan (+75, -10)>

A bug fix in ResolveReferences.dedupRight to make sure it only rewrites the attributes of ancestor nodes of the conflict node that has the conflicting attributes.

[API][SPARK-32374][SQL] Disallow setting properties when creating temporary views (+33, -8)>

Currently, users can specify properties when creating a temporary view. However, the specified properties are not used and can be misleading. This PR proposes to disallow users specifying the properties when creating temporary views.

This PR propose to disallow specifying properties when creating temporary views.

[API][SPARK-32375][SQL] Basic functionality of table catalog v2 for JDBC (+365, -17)>

This PR implements basic functionalities of the TableCatalog interface, so that end-users can use the JDBC as a catalog.

[API][SPARK-32406][SQL] Make RESET syntax support single configuration reset (+45, -8)>

Before changes, the reset command only supports restore all of the runtime configurations to their defaults. After changes, reset can restore one configuration.

[API][SPARK-32430][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage preparation (+79, -5)>

Provide a generic mechanism for plugins to inject rules into the AQE "query prep" stage that happens before query stage creation.

The issue here is that we create new query stages but we do not have access to the parent plan of the new query stage so certain things can not be determined because you have to know what the parent did. With this change it would allow you to add TAGs to be able to figure out what is going on.

This PR adds the following extension API:

  type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
   * Inject a rule that can override the the query stage preparation phase of adaptive query
   * execution.
  def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit

[SPARK-32420][SQL] Add handling for unique key in non-codegen hash join (+177, -50)>

HashRelation has two separate code paths for unique key look up and non-unique key look up E.g. in its subclass UnsafeHashedRelation, unique key look up is more efficient as it does not have e.g. extra Iterator[UnsafeRow].hasNext()/next() overhead per row.

BroadcastHashJoinExec has handled unique key vs non-unique key separately in code-gen path. But the non-codegen path for broadcast hash join and shuffled hash join do not separate it yet, so adding the support here.

Shuffled hash join and non-codegen broadcast hash join still rely on this code path for execution. So this PR will help save CPU for executing this two type of join.

[SPARK-32424][SQL] Fix silent data change for timestamp parsing if overflow happens (+323, -30)>

Raise ArithmeticException instead of giving the wrong answer if overflow happens in timestamp parsing.

[SPARK-32059][SQL] Allow nested schema pruning thru window/sort plans (+217, -2)>

This PR is intended to solve schema pruning not working with window functions, as described in SPARK-32059. It also solved schema pruning not working with Sort. It also generalizes with Project->Filter->[any node can be pruned].

[SPARK-32290][SQL] SingleColumn Null Aware Anti Join Optimize (+409, -31)>

Normally, a Null aware anti join will be planed into BroadcastNestedLoopJoin which is very time consuming, for instance, in TPCH Query 16.

    count(distinct ps_suppkey) as supplier_cnt
    p_partkey = ps_partkey
    and p_brand <> 'Brand#45'
    and p_type not like 'MEDIUM POLISHED%'
    and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
    and ps_suppkey not in (
            s_comment like '%Customer%Complaints%'
group by
order by
    supplier_cnt desc,

In above query, will planed into

LeftAnti condition Or((ps_suppkey=s_suppkey), IsNull(ps_suppkey=s_suppkey))

Inside BroadcastNestedLoopJoinExec will perform O(M*N), BUT if there is only single column in NAAJ, we can always change buildSide into a HashSet, and streamedSide just need to lookup in the HashSet, then the calculation will be optimized into O(M).

For now this optimize is only targeting on null aware anti join with single column case. After apply this patch, the TPCH Query 16 performance decrease from 41mins to 30s


[SPARK-23631][ML][PYSPARK] Add summary to RandomForestClassificationModel (+496, -52)>

This PR adds a summary to RandomForestClassificationModel. The user can get a summary of this classification model, and retrieve common metrics such as accuracy, weightedTruePositiveRate, roc (for binary), pr curves (for binary), etc.


[SPARK-32140][ML][PYSPARK] Add training summary to FMClassificationModel (+257, -32)>

Add training summary for FMClassificationModel so that user can get the training process status, such as loss value of each iteration and total iteration number.

[SPARK-29292][SQL][ML] Update rest of default modules (Hive, ML, etc) for Scala 2.13 compilation (+106, -102)>

Make the rest of the default modules (i.e. those you get without specifying -Pyarn etc) compile under Scala 2.13. This also of course does not demonstrate that tests pass yet in 2.13.

[SPARK-32315][ML] Provide an explanation error message when calling require (+4, -2)>

Small improvement in the error message shown to user https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala#L537-L538

[SPARK-32310][ML][PYSPARK] ML params default value parity in classification, regression, clustering and fpm (+141, -157)>

Set params default values in trait ...Params in both Scala and Python.

Make ML has the same default param values between estimator and its corresponding transformer, and also between Scala and Python.

[SPARK-32298][ML] tree models prediction optimization (+28, -18)>

Use while-loop instead of the recursive way to make it 3% ~ 10% faster.



[SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row (+64, -1)>

This patch fixes the odd join result being occurred from stream-stream join for state store format V2.

[SPARK-31985][SS] Remove incomplete/undocumented stateful aggregation in continuous mode (+2, -1117)>

Removes the undocumented and incomplete feature of "stateful aggregation" in continuous mode, which would reduce 1100+ lines of code.


[API][SPARK-32138] Drop Python 2.7, 3.4 and 3.5 (+735, -2033)>

This PR drops Python 2.7, 3.4 and 3.5 support, and removes all the python 2 compatibility hacks in the Spark codebase. These versions are EOL. It also simplifies the support of type hint and C pickle.

[API][SPARK-32094][PYTHON] Update cloudpickle to v1.5.0 (+1602, -1366)>

Upgrade PySpark's embedded cloudpickle to the latest cloudpickle v1.5.0 (See https://github.com/cloudpipe/cloudpickle/blob/v1.5.0/cloudpickle/cloudpickle.py)

[API][SPARK-29157][SQL][PYSPARK] Add DataFrameWriterV2 to Python API (+305, -2)>

  • Adds DataFramWriterV2 class.

  • Adds writeTo method to pyspark.sql.DataFrame.

  • Adds related SQL partitioning functions (years, months, ..., bucket).


[SPARK-32179][SPARK-32188][PYTHON][DOCS] Replace and redesign the documentation base (+2129, -584)>

Initial draft for the new PySpark docs shape: https://hyukjin-spark.readthedocs.io/en/latest/index.html image

[SPARK-32447][CORE] Use python3 by default in pyspark and find-spark-home scripts (+6, -6)>

This PR aims to use python3 instead of python inside bin/pyspark, bin/find-spark-home and bin/find-spark-home.cmd script.


[SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing (+64, -3)>

This PR is for fixing a bug in the history server, that it throws java.lang.IllegalStateException: Disk usage tracker went negative randomly during restart.

[SPARK-32200][WEBUI] Redirect to the history page when accessed to /history on the HistoryServer without application id (+14, -3)>

This PR proposes to change the HistoryServer to redirect to the history page when we access to /history without application id, instead of returning 400.

[API][SPARK-31608][CORE][SHS] Add a new type of KVStore to make loading UI faster (+404, -0)>

Add a new class HybridStore to make the history server faster when loading event files. When rebuilding the application state from event logs, HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. HybridStore is to make content serving faster by using more memory. It's only safe to enable it when the cluster is not having a heavy load.

"spark.history.store.hybridStore.enabled" (default: false)
Whether to use HybridStore as the store when parsing event logs. HybridStore will first write data to an in-memory store and having a background thread that dumps data to a disk store after the writing to in-memory store is completed.

"spark.history.store.hybridStore.maxMemoryUsage" (default: 2g)
Maximum memory space that can be used to create HybridStore. The HybridStore co-uses the heap memory, so the heap memory should be increased through the memory option for SHS if the HybridStore is enabled.


[API][SPARK-32125][UI] Support get taskList by status in Web UI and SHS Rest API (+1666, -7)>

Support fetching taskList by status as below:



When there're large number of tasks in one stage, the current api is very inefficient to get taskList by status

[SPARK-32426][SQL] ui shows sql after variable substitution (+6, -2)>

Before changes, when submitting sql with variables, the sql displayed by ui is not replaced by variables.


After changes, the actual executed SQL can be seen in UI.




[SPARK-31753][SQL][DOCS] Add missing keywords in the SQL docs (+520, -25)>

This PR added the documentation for the following features:

  • LAST


[SPARK-32150][BUILD] Upgrade to ZStd 1.4.5-4 (+4, -4)>

[SPARK-32245][INFRA] Run Spark tests in Github Actions (+400, -173)>

This PR enables us to run the Spark tests in Github Actions. It's useful when Jenkins is flaky/down.

[API][SPARK-32004][ALL] Drop references to slave (+863, -657)>

This change replaces the word slave with alternatives matching the context. It affects the JSON output of some Spark events, as we append some new fields with "slave" being replaced by better word.

[SPARK-32305][BUILD] Make mvn clean remove metastore_db and spark-warehouse (+6, -0)>

Add additional configuration to maven-clean-plugin to ensure cleanup metastore_db and spark-warehouse directory when executing mvn clean command.


[SPARK-32316][TESTS][INFRA] Test PySpark with Python 3.8 in Github Actions (+18, -15)>

This PR aims to test PySpark with Python 3.8 in Github Actions. In the script side, it is already ready:


[SPARK-32036] Replace references to blacklist/whitelist language with more appropriate terminology, excluding the blacklisting feature (+231, -219)>

Remove references to these "blacklist" and "whitelist" terms besides the blacklisting feature as a whole, which can be handled in a separate JIRA/PR.

[SPARK-29802][BUILD] Use python3 in build scripts (+7, -7)>

Use /usr/bin/env python3 consistently instead of /usr/bin/env python in build scripts, to reliably select Python 3.

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s (+61, -27)>

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

[SPARK-32389][TESTS] Add all hive.execution suite in the parallel test group (+5, -1)>

Add a new parallel test group for all hive.execution suites to reduce Jenkins testing time.

[SPARK-32441][BUILD][CORE] Update json4s to 3.7.0-M5 for Scala 2.13 (+13, -13)>

[SPARK-32452][R][SQL] Bump up the minimum Arrow version as 1.0.0 in SparkR (+2, -2)>