[OSS DIGEST] The major changes of Apache Spark from July 29 to August 25

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[OSS DIGEST] The major changes of Apache Spark from July 29 to August 25

Allison Wang

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team (this time we combined 4 weeks).
For each API/configuration/behavior change, an [API] tag is added in the title.
All the content has been appended to this doc.


[API][SPARK-32283][CORE] Kryo should support multiple user registrators (+13, -12)>

This PR fixes a regression for spark.kryo.registrator in 3.0. In the previous Spark version (2.x), it supports multiple user registrators by

private val userRegistrators = conf.get("spark.kryo.registrator", "")

But it doesn't work in 3.0. It was fixed by adding toSequence in Kryo.scala. The configuration spark.kryo.registrator was changed back to Sequence of String for supporting multiple user registrators. The default value is Nil.

[SPARK-32175][CORE] Fix the order between initialization for ExecutorPlugin and starting heartbeat thread (+89, -10)>

This PR changes the order between initialization for ExecutorPlugin and starting the heartbeat thread in Executor. In the current master, the heartbeat thread in an executor starts after plugin initialization so if the initialization takes a long time, the heartbeat is not sent to the driver and the executor will be removed from the cluster.

[API][SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors (+72, -9)>

This PR is a follow up of [SPARK-32160] Disallow to create SparkContext in executors. It adds a config to switch allow/disallow to create SparkContext in executors since some users or libraries actually create SparkContext in executors.

spark.driver.allowSparkContextInExecutors (Default: false)

  • If set to true, SparkContext can be created in executors.

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning (+539, -7)>

This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework:

  1. Loss of decommissioned executors is not treated as a job failure: We know that the decommissioned executor would be dying soon, so its death is clearly not caused by the application.

  2. Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state. Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wastage and too late would lead to fetch failures.

The approach here is to do this clearing when the very first fetch failure is observed on the decommissioned block manager, without waiting for other blocks to also signal a failure.

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning (+310, -44)>

This PR shuts down the executor immediately when decommissioning is finished (i.e., all tasks are done, and the configured migration of all RDD blocks and shuffle data is completed). It helps the cluster manager to release the unneeded resources as soon as possible.

[SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds of resources (+133, -59)>

Previously, CoarseGrainedSchedulerBackend.maxNumConcurrentTasks() calculates only the CPU resources for the max concurrent tasks. This can cause the application to hang when a barrier stage requires the other custom resources (e.g., GPU) but the cluster doesn't have enough corresponding resources. This PR fixed the hang issue by calculating all kinds of resources in CoarseGrainedSchedulerBackend.maxNumConcurrentTasks().

[API][SPARK-32517][CORE] Add StorageLevel.DISK_ONLY_3 (+17, -4)>

This PR added StorageLevel.DISK_ONLY_3 as a built-in StorageLevel for better user experience.

[SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling (+380, -134)>

This PR makes Spark dynamic scaling use graceful decommissioning when it is enabled instead of killing executors directly, in order to avoid triggering task recomputation as much as possible.

[SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars (+122, -9)>

This PR makes Executor load jars and files added by --jars and --files on Executor initialization, to avoid downloading those jars/files twice. This PR also fixes the issue of ExecutorPlugin not working with Standalone Cluster and Kubernetes when a plugin is added by --jars and --files option with spark-submit.

[API][SPARK-32613][CORE] Fix regressions in DecommissionWorkerSuite (+153, -37)>

The PR fixes the flakiness of DecommissionWorkerSuite caused by a regression introduced in SPARK-31197 and adds the following config:

spark.executor.decommission.removed.infoCacheTTL (Default: 5m)

Duration for which a decommissioned executor's information will be kept after its removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to decommissioning even after the mapper executor has been decommissioned. This allows eager recovery from fetch failures caused by decommissioning, increasing job robustness.

[API][SPARK-32651][CORE] Decommission switch configuration should have the highest hierarchy (+30, -25)>

API change: configuration option renaming: Rename spark.worker.decommission.enabled to spark.decommission.enabled and move it from org.apache.spark.internal.config.Worker to org.apache.spark.internal.config.package. (Note: this is a new configuration that is not in any released versions yet)

Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(https://github.com/apache/spark/pull/27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers.

[SPARK-32600][CORE] Unify task name in some logs between driver and executor (+44, -42)>

Usability improvement: This PR replaces some arbitrary task names in logs with the widely used task name (e.g. "task 0.0 in stage 1.0 (TID 1)") among driver and executor. This will change the task name in TaskDescription by appending TID.

Users will see the more consistent task names in the log, in the form of task name (e.g. "task 0.0 in stage 1.0 (TID 1)") (where TID is Task ID).

[SPARK-32658][CORE] Fix PartitionWriterStream partition length overflow (+20, -2)>

Regression bug fix: The count in PartitionWriterStream should be a long value, instead of int. The issue is introduced by https://github.com/apache/spark/commit/abef84a868e9e15f346eea315bbab0ec8ac8e389 . When the overflow happens, the shuffle index file would record wrong index of a reduceId, thus lead to FetchFailedException: Stream is corrupted error.

Also added some debug logs for easier debugging of similar issues in the future.

[SPARK-32653][CORE] Decommissioned host/executor should be considered as inactive in TaskSchedulerImpl (+102, -10)>

Improvement: Add decommissioning status checking for a host or executor while checking it's active or not. And a decommissioned host or executor should be considered as inactive.

First of all, this PR is not a correctness bug fix but gives improvement indeed. And the main problem here we want to fix is that a decommissioned host or executor should be considered as inactive.

TaskSetManager.computeValidLocalityLevels depends on TaskSchedulerImpl.isExecutorAlive/hasExecutorsAliveOnHost to calculate the locality levels. Therefore, the TaskSetManager could also get corresponding locality levels of those decommissioned hosts or executors if they're not considered as inactive. However, on the other side, CoarseGrainedSchedulerBackend won't construct the WorkerOffer for those decommissioned executors. That also means TaskSetManager might never have a chance to launch tasks at certain locality levels but only suffers the unnecessary delay because of delay scheduling. So, this PR helps to reduce this kind of unnecessary delay by making decommissioned host/executor inactive in TaskSchedulerImpl.


[API][SPARK-32346][SQL] Support filters pushdown in Avro datasource (+430, -199)>

This PR adds support for filters pushdown in Avro datasource V1 and V2.

spark.sql.avro.filterPushdown.enabled (Default: true)

  • When true, enable filter pushdown to Avro datasource.

The changes improve performance on synthetic benchmarks up to 2 times on JDK 11.

[SPARK-32332][SQL] Add columnar exchanges (+260, -70)>

This PR adds abstract classes for ShuffleExchange and BroadcastExchange so that users can provide their columnar implementations. It also updates AdaptiveSparkPlanExec so that the columnar rules can see exchange nodes.

[API][SPARK-32431][SQL] Check duplicate nested columns in read from built-in datasources (+152, -12)>

When spark.sql.caseSensitive is false (by default), this PR checks that there are no duplicated column names on the same level (top-level or nested levels) in reading from in-built datasources Parquet, ORC, Avro and JSON. If such duplicated columns exist, throw the exception:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema:

This change is to make handling of duplicated nested columns similar to the handling of duplicated top-level columns i. e. output the same error when spark.sql.caseSensitive is false:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`

[API][SPARK-32510][SQL] Check duplicate nested columns in read from JDBC datasource (+68, -10)>

This PR checks that there are no duplicated column names on the same level (top-level or nested levels) in reading from the JDBC datasource. If such duplicated columns exist, throw the exception:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value:

This check takes into account the SQL config spark.sql.caseSensitive (false by default).

This change is to make handling of duplicated nested columns similar to the handling of duplicated top-level columns i.e. output the same error:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value: `camelcase`

[SPARK-32421][SQL] Add code-gen for shuffled hash join (+499, -420)>

This PR adds codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen, so most of the code change is to refactor existing codegen in BroadcastHashJoinExec to HashJoin. Codegen shuffled hash join can help save CPU cost. We see a 30% wall clock time improvement compared to existing non-codegen code path in the example test query in JoinBenchmark.

[API][SPARK-32274][SQL] Make SQL cache serialization pluggable (+813, -320)>

This PR adds a config to let users change how SQL/Dataframe data is compressed when cached.

spark.sql.cache.serializer (Default: org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer)

The name of a class that implements org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to translate SQL data into a format that can more efficiently be cached. The underlying API is subject to change so use with caution. Multiple classes cannot be specified. The class must have a no-arg constructor.

[API][SPARK-24884][SQL] Support regexp function regexp_extract_all (+440, -27)>

This PR adds support for regexp function regexp_extract_all. It is a very useful function that expands the capabilities of regexp_extract. For example:

SELECT regexp_extract('1a 2b 14m', '\d+', 0); -- 1
SELECT regexp_extract_all('1a 2b 14m', '\d+', 0); -- [1, 2, 14]
SELECT regexp_extract('1a 2b 14m', '(\d+)([a-z]+)', 2); -- 'a'
SELECT regexp_extract_all('1a 2b 14m', '(\d+)([a-z]+)', 2); -- ['a', 'b', 'm']

Some mainstream databases that support the syntax:


regexp_extract_all(str, regexp[, idx])

Extract all strings in the str that match the regexp expression and corresponding to the regex group index.

[API][SPARK-32257][SQL] Reports explicit errors for invalid usage of SET/RESET command (+129, -16)>

This PR modifies the parser code to handle invalid usages of a SET/RESET command.

For example

SET spark.sql.ansi.enabled true

The above SQL command does not change the configuration value and it just tries to display the value of the configuration 'spark.sql.ansi.enabled true'. This PR disallows using special characters including spaces in the configuration name and reports a user-friendly error instead. In the error message, it tells users a workaround to use quotes or a string literal if they still need to specify a configuration with them.

Before this PR:

scala> sql("SET spark.sql.ansi.enabled true").show(1, -1)
|key                        |value      |
|spark.sql.ansi.enabled true|<undefined>|

After this PR:

scala> sql("SET spark.sql.ansi.enabled true")
Expected format is 'SET', 'SET key', or 'SET key=value'. If you want to include special characters in key, please use quotes, e.g., SET `ke y`=value.(line 1, pos 0)

== SQL ==
SET spark.sql.ansi.enabled true

[API][SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT (+554, -72)>

This PR adds support for using FILTER clauses on one or more DISTINCT aggregate expressions. For example

select sum(distinct id) filter (where sex = 'man') from student;

This PR is related to #26656 which only supports the FILTER clause on aggregate expression without DISTINCT.

[API][SPARK-32499][SQL] Use {} in conversions maps and structs to strings (+105, -78)>

This PR changes the casting of the map and struct values to strings by using the {} brackets instead of []. The behavior is controlled by the SQL config spark.sql.legacy.castComplexTypesToString.enabled. When it is trueCAST wraps maps and structs by [] in casting to strings. Otherwise, if this is false, which is the default, maps and structs are wrapped by {}.

For example:

  • Before this change: struct<s:int,[dotNET, 2012]:bigint,[Java, 2013]:bigint>
  • After this change: struct<s:int,{dotNET, 2012}:bigint,{Java, 2013}:bigint>

This change is needed

  • To distinguish structs/maps from arrays.
  • To make show's output consistent with Hive and conversions to Hive strings.
  • To display dataframe content in the same form by spark-sql and show
  • To be consistent with the *.sql tests

[API][SPARK-32402][SQL] Implement ALTER TABLE in JDBC Table Catalog (+164, -25)>

This PR implemented the following ALTER TABLE syntaxes in JDBC Table Catalog:

ALTER TABLE table_name ADD COLUMNS ( column_name datatype [ , ... ] );
ALTER TABLE table_name RENAME COLUMN old_column_name TO new_column_name;
ALTER TABLE table_name DROP COLUMN column_name;
ALTER TABLE table_name ALTER COLUMN column_name TYPE new_type;

[API][SPARK-32501][SQL] Convert null to "null" in structs, maps and arrays while casting to strings (+60, -26)>

This PR proposed to convert the null elements of map/struct/array to the "null" string rather than delete them while converting map/struct/array values to strings. The old behavior can be restored by setting spark.sql.legacy.omitNestedNullInCast.enabled to true. This helps to distinguish the empty string element and the null element. For example,

Before this PR:

scala> Seq(Seq(""), Seq(null)).toDF().show
|   []|
|   []|

After this PR:

scala> Seq(Seq(""), Seq(null)).toDF().show
| value|
|    []|

[SPARK-32546][SQL] Get table names directly from Hive tables (+5, -3)>

This PR proposed to get table names directly from a sequence of Hive tables in HiveClientImpl.listTablesByType(). This PR not only avoids the unnecessary conversion from the Hive table to the Catalog table but also fixes the Hive SerDe loading issue, where multiple clients sharing the same Hive metastore but only partial clients have the certain SerDe.

[API][SPARK-32559][SQL] Fix the trim logic in UTF8String.toInt/toLong to handle non-ASCII characters correctly (+172, -12)>

This PR fixed the trim logic in UTF8String.toInt/toLong to not treat non-ASCII characters as whitespaces, e.g.

Before this PR:

scala> sql("SELECT cast('1中文' AS bigint)").show
|                 1|

After this PR:

scala> sql("SELECT cast('1中文' AS bigint)").show
|                 null|

[API][SPARK-25557][SQL] Nested column predicate pushdown for ORC (+460, -310)>

This PR supported the nested column predicate pushdown for ORC(Parquet has been supported before). This is configurable via the conf "spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources".

[API][SPARK-32555][SQL] Add unique ID on query execution (+7, -0)>

This PR added a unique ID on QueryExecution. Listeners can leverage the ID to deduplicate redundant calls.

[API][SPARK-32576][SQL] Support PostgreSQL bpchar type and array of char type (+3, -1)>

This PR supported PostgreSQL bpchar in order to get rid of Unsupported type ARRAY error when users try to use the char array data type under Postgre dialect.

[API][SPARK-32337][SQL] Show initial plan in AQE plan tree string (+210, -43)>

This PR added the initial plan in AdaptiveSparkPlanExec and generates tree string for both the current/final plan and the initial plan. The difference between Current Plan and Final Plan here is that current plan indicates an intermediate state. The plan is subject to further transformations, while the final plan represents an end state, which means the plan will no longer be changed. For example,

Before this PR,


AdaptiveSparkPlan isFinalPlan=true
+- *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2


== Physical Plan ==
AdaptiveSparkPlan (9)
+- BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)

After this PR,


AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange
+- == Initial Plan ==
   :- Sort
   :  +- Exchange


== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Current Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)
+- == Initial Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)

[SPARK-32573][SQL] Anti Join Improvement with EmptyHashedRelation and EmptyHashedRelationWithAllNullKeys (+124, -20)>

This PR improved Anti Join by:

  • Use EmptyHashedRelation to perform a fast stop for the common Anti Join as well
  • Eliminate BroadcastHashJoin(NAAJ) if buildSide is a EmptyHashedRelationWithAllNullKeys in AQE

[SPARK-32540][SQL] Eliminate the filter clause in aggregate (+133, -14)>

This PR added an optimizer rule EliminateAggregateFilter to eliminate the trivial filter clause in aggregate, e.g.,


can be optimized to:


[SPARK-32594][SQL] Fix serialization of dates inserted to Hive tables (+11, -1)>

This PR fixes a bug that causes the SQL INSERT command to put incorrect Date values into Hive tables. This PR fixes the erroneous implementation of DaysWritable that did not respect date rebases and used not initialized daysSinceEpoch (1970-01-01).

[SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on big endian platforms (+166, -16)>

This PR fixes an issue introduced by SPARK-26985 in which RLE float/double data in parquet files are retrieved correctly on big endian platforms. This PR fixes the incorrect implementation in SPARK-26985 that read the RLE entries from parquet files as BIG_ENDIAN on big endian platforms despite the fact that parquet data is always in little endian format.

[SPARK-32352][SQL] Partially pushdown supports data filter if it mixed in partition filters (+38, -3)>

This PR extracts data-column filters mixed together with partition-column filters in conjunctive conditions and pushes the extracted data-column filters down into the file scan. For example, in the filter (partCol = '1' AND dataCol = 1) OR (partCol = '2' and dataCol = 2), we can push down the data-column filter i = 1 or i = 2.

[API][SPARK-31694][SQL] Add SupportsPartitions APIs on DataSourceV2 (+688, -15)>

This PR adds partition management API support as part of Table API in DataSource V2. It includes the API to get the partition schema of table; to create/drop a partition; to check the existence of a partition; to retrieve the partition metadata; to list partition identifiers; to replace the partition metadata.

[API][SPARK-32399][SQL] Full outer shuffled hash join (+693, -48)>

This PR implements full outer join in shuffled hash join without codegen support.

[SPARK-32466][TEST][SQL] Add PlanStabilitySuite to detect SparkPlan regression (+130898, -65)>

This PR adds a test suite that checks query plans against golden files for TPC-DS queries, in an effort to detect important changes in the Spark optimizer and planner as well as potential bugs introduced by such changes.

[SPARK-32652][SQL] ObjectSerializerPruning fails for RowEncoder (+52, -4)>

Bug fix: Fix type mismatch in ObjectSerializerPruning.alignNullTypeInIf, to consider the isNull check generated in RowEncoder, which is Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: Nil).

[API][SPARK-32607][SQL] Script Transformation ROW FORMAT DELIMITED TOK_TABLEROWFORMATLINES should only support '\n' (+51, -6)>

New behavior: stricten the liner terminator check in Scrip Transform no-serde (ROW FORMAT DELIMITED) mode to only accept \n, to ensure the accuracy of data.

Scrip Transform no-serde (ROW FORMAT DELIMITED) mode LINE TERMINNATED BY only support \n.

Tested in hive : Hive 1.1 image

Hive 2.3.7 image

[SPARK-32608][SQL] Script Transform ROW FORMAT DELIMIT value should format value (+116, -16)>

Bug fix: For SQL

  USING 'cat' AS (a, b, c)
FROM testData

The correct

TOK_TABLEROWFORMATFIELD should be nut actually ','

TOK_TABLEROWFORMATLINES should be \n but actually '\n'

[SPARK-32624][SQL] Use getCanonicalName to fix byte[] compile issue (+8, -1)>

(Superseded by https://github.com/apache/spark/pull/29602)

Bug fix: Fix a bug in CodegenContext.addReferenceObj() which generates wrong code ([B) for the byte[] Java type.

Unfortunately this PR introduces a new bug for nested Scala types, and was fixed later by https://github.com/apache/spark/pull/29602.

[SPARK-32621][SQL] 'path' option can cause issues while inferring schema in CSV/JSON datasources (+76, -21)>

Bug fix: When CSV/JSON data sources infer schema (e.g, def inferSchema(files: Seq[FileStatus]), they use the files along with the original options. files in inferSchema could have been deduced from the "path" option if the option was present, so this can cause issues (e.g., reading more data, listing the path again) since the "path" option is added to the files.

The existing behavior can cause the following issue:

class TestFileFilter extends PathFilter {
  override def accept(path: Path): Boolean = path.getParent.getName != "p=2"

val path = "/tmp"
val df = spark.range(2)
df.write.json(path + "/p=1")
df.write.json(path + "/p=2")

val extraOptions = Map(
  "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
  "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName

// This works fine.
assert(spark.read.options(extraOptions).json(path).count == 2)

// The following with "path" option fails with the following:
// assertion failed: Conflicting directory structures detected. Suspicious paths
//	file:/tmp
//	file:/tmp/p=1
assert(spark.read.options(extraOptions).format("json").option("path", path).load.count() === 2)

[SPARK-28863][SQL] Introduce AlreadyPlanned to prevent reanalysis of V1FallbackWriters (+196, -24)>

Bug fix: Fix a bug to avoid having a physical plan that is disconnected from the physical plan that is being executed in V1WriteFallback execution.

This PR introduces a new LogicalNode type AlreadyPlanned, and related physical plan and preparation rule.

With the DataSourceV2 write operations, we have a way to fallback to the V1 writer APIs using InsertableRelation. The gross part is that we're in physical land, but the InsertableRelation takes a logical plan, so we have to pass the logical plans to these physical nodes, and then potentially go through re-planning. This re-planning can cause issues for an already optimized plan.

A useful primitive could be specifying that a plan is ready for execution through a logical node AlreadyPlanned. This would wrap a physical plan, and then we can go straight to execution.

[SPARK-32640][SQL] Downgrade Janino to fix a correctness bug (+17, -14)>

Bug fix: fix a code generation issue in conditional expressions in newer Janino version by downgrading to known good version.

The symptom is about NaN comparison. For code below

if (double_value <= 0.0) {
} else {

If double_value is NaN, NaN <= 0.0 is false and we should go to the else branch. However, current Spark goes to the if branch and causes correctness issues like SPARK-32640.

[SPARK-32646][SQL] ORC predicate pushdown should work with case-insensitive analysis (+243, -60)>

Bug fix: Fix ORC predicate pushdown under case-insensitive analysis cases. The field names in pushed down predicates don't need to match in exact letter case with physical field names in ORC files, if we enable case-insensitive analysis.

Currently, ORC predicate pushdown doesn't work with case-insensitive analysis. A predicate "a < 0" cannot pushdown to ORC file with field name "A" under case-insensitive analysis.

But, Parquet predicate pushdown works with this case. We should make ORC predicate pushdown work with case-insensitive analysis too.

[SPARK-32672][SQL] Fix data corruption in boolean bit set compression (+29, -3)>

Bug fix: Fix a data corruption issue. Essentially the BooleanBitSet CompressionScheme would miss nulls at the end of a CompressedBatch. The values would then default to false.

[SPARK-32649][SQL] Optimize BHJ/SHJ inner/semi join with empty hashed relation (+157, -53)>

Optimization: A very minor optimization for rare use cases, but in case the build side turns out to be empty, we can leverage it to short-cut stream side to save CPU and IO.

Example broadcast hash join query similar to JoinBenchmark with empty hashed relation:

  def broadcastHashJoinLongKey(): Unit = {
    val N = 20 << 20
    val M = 1 << 16

    val dim = broadcast(spark.range(0).selectExpr("id as k", "cast(id as string) as v"))
    codegenBenchmark("Join w long", N) {
      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))

Comparing wall clock time for enabling and disabling this PR (for non-codegen code path). Seeing like 8x improvement.

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
Join w long:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
Join PR disabled                                    637            646          12         32.9          30.4       1.0X
Join PR enabled                                      77             78           2        271.8           3.7       8.3X

For broadcast hash join and shuffled hash join, whenever the build side hashed relation turns out to be empty. We don't need to execute stream side plan at all, and can return an empty iterator (for inner join and left semi join), because we know for sure that none of stream side rows can be outputted as there's no match.

[API][SPARK-32516][SQL] 'path' option cannot coexist with load()'s path parameters (+93, -28)>

New behavior: Make the behavior consistent for the path option when loading dataframes with a single path (e.g, option("path", path).format("parquet").load(path) vs. option("path", path).parquet(path)) by disallowing path option to coexist with load's path parameters.

The existing behavior is inconsistent:

scala> Seq(1).toDF.write.mode("overwrite").parquet("/tmp/test")

scala> spark.read.option("path", "/tmp/test").format("parquet").load("/tmp/test").show
|    1|

scala> spark.read.option("path", "/tmp/test").parquet("/tmp/test").show
|    1|
|    1|

[SPARK-32683][DOCS][SQL] Fix doc error and add migration guide for datetime pattern F (+3, -1)>

Doc/Migration guide: Fixes a doc error and add a migration guide for datetime pattern.

There is a bug of the doc that we inherited from JDK https://bugs.openjdk.java.net/browse/JDK-8169482

The SimpleDateFormatter (F Day of week in month) we used in 2.x and the DatetimeFormatter (F week-of-month) we use now both have the opposite meanings to what they declared in the java docs. And unfortunately, this also leads to silent data change in Spark too.

The week-of-month is actually the pattern W in DatetimeFormatter, which is banned to use in Spark 3.x.

If we want to keep the pattern F, we need to accept the behavior change with the proper migration guide and fix the doc in Spark

[SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV (+37, -14)>

Bug fix: Fix a bug that drops rows that start with a null char when this is not requested or intended

Spark's CSV source can optionally ignore lines starting with a comment char. Some code paths check to see if it's set before applying comment logic (i.e. not set to default of \0), but many do not, including the one that passes the option to Univocity. This means that rows beginning with a null char were being treated as comments even when 'disabled'.


[API][SPARK-32449][ML][PYSPARK] Add summary to MultilayerPerceptronClassificationModel (+222, -14)>

This PR adds training summary to MultilayerPerceptronClassificationModel so that users can get the training process status, such as the loss value of each iteration and total iteration number. It adds two user-facing methods:

  • MultilayerPerceptronClassificationModel.summary: gets summary of model on training set
  • MultilayerPerceptronClassificationModel.evaluate(dataset): evaluates the model on a test dataset

[SPARK-32310][ML][PYSPARK] ML params default value parity in feature and tuning (+274, -135)>

This PR sets params default values in trait Params for feature and tuning in both Scala and Python. It is to make ML have the same default param values between estimator and its corresponding transformer, and also between Scala and Python.

[SPARK-32092][ML][PYSPARK] Fix parameters not being copied in CrossValidatorModel.copy(), read() and write() (+172, -26)>

Bug fix: Changed the definitions of CrossValidatorModel.copy()/_to_java()/_from_java() so that exposed parameters (i.e. parameters with get() methods) are copied in these methods.

Parameters are copied in the respective Scala interface for CrossValidatorModel.copy(). It fits the semantics to persist parameters when calling CrossValidatorModel.save() and CrossValidatorModel.load() so that the user gets the same model by saving and loading it after. Not copying across numFolds also causes bugs like Array index out of bound and losing sub-models because these parameters will always default to 3 (as described in the JIRA ticket).

[SPARK-32676][ML] Fix double caching in KMeans/BiKMeans (+59, -83)>

Bug fix: Fix double caching in KMeans/BiKMeans:

  1. let the callers of runWithWeight to pass whether handlePersistence is needed;
  2. persist and unpersist inside of runWithWeight;
  3. persist the norms if needed according to the comments;


[SPARK-32568][BUILD][SS] Upgrade Kafka to 2.6.0 (+3, -3)>

This PR upgraded Kafka client library from 2.5.0 to 2.6.0 for Apache Spark 3.1.0. This upgrade includes the client-side bug fixes like KAFKA-10134 and KAFKA-10223.

[SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files (+191, -16)>

This PR cached the fetched list of files in FileStreamSource to avoid re-fetching whenever possible since fetching is quite expensive. This improvement only takes effect when maxFilesPerTrigger is set and latestFirst=false(default). Note that the driver process would require more memory with this change, though it doesn't hurt much as the peak memory is still similar.

Manually tests under the test environment:

  • input files
    • 171,839 files distributed evenly into 24 directories
    • each file contains 200 lines
  • query: read from the "file stream source" and repartition to 50, and write to the "file stream sink"
    • maxFilesPerTrigger = 100

shows the performance improvement:

Before this PR:

After this PR:

The area of brown color represents "latestOffset" where listing operation is performed for FileStreamSource. After this PR, the cost for listing is paid "only once", whereas before this PR it was for "every batch".

[API][SPARK-32456][SS] Check the Distinct by assuming it as Aggregate for Structured Streaming (+57, -2)>

This PR proposed to treat the Distinct node as Aggregate in UnsupportOperationChecker for streaming. This change not only gives the better error message for Distinct related operations in Append mode that doesn't have a watermark but also makes Distinct in complete mode runnable.

[SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue (+129, -87)>

Improvement: In many operations on CompactibleFileStreamLog reads a metadata log file and materializes all entries into memory. As the nature of the compact operation, CompactibleFileStreamLog may have a huge compact log file with bunch of entries included, and for now they're just monotonically increasing, which means the amount of memory to materialize also grows incrementally. This leads pressure on GC.

This patch proposes to streamline the logic on file stream source and sink whenever possible to avoid memory issue. To make this possible we have to break the existing behavior of excluding entries - now the compactLogs method is called with all entries, which forces us to materialize all entries into memory. This is hopefully no effect on end users, because only file stream sink has a condition to exclude entries, and the condition has been never true. (DELETE_ACTION has been never set.)

[SPARK-32648][SS] Remove unused DELETE_ACTION in FileStreamSinkLog (+0, -23)>

Refactoring: Removing unused DELETE_ACTION in FileStreamSinkLog.


[API][SPARK-32471][SQL][DOCS][TESTS][PYTHON][SS] Describe JSON option allowNonNumericNumbers and support it by PySpark json() (+53, -8)>

This PR:

  • Adds the read-only JSON option allowNonNumericNumbers in the DataFrameReader’s json() API
  • Adds new test cases for allowed JSON field values: NaN, +INF, +Infinity, Infinity, -INF and -Infinity

[SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode (+110, -15)>

This PR introduces InheritableThread class that works identically with threading.Thread but:

  • It can inherit the inheritable attributes of a JVM thread such as InheritableThreadLocal when the pinned thread mode is enabled, see also #24898.
  • This InheritableThread finishes the corresponding thread in JVM to prevent resource leaks automatically when the InheritableThread instance is garbage collected in Python side. In addition, this PR deprecates collectWithJobGroup since it was a temporary workaround added to avoid this leak issue and we have a fix for the issue here.

[API][SPARK-32507][DOCS][PYTHON] Add main page for PySpark documentation (+36, -0)>

This PR proposed to write the main page of PySpark documentation to give the better usability and readability:

[API][SPARK-32191][PYTHON][DOCS] Port migration guide for PySpark docs (+184, -62)>

This PR ported old PySpark migration guide to new PySpark docs.

[API][SPARK-32686][PYTHON] Un-deprecate inferring DataFrame schema from list of dict (+4, -11)>

New API (restoring deprecated API): Un-deprecates Spark's ability to infer a DataFrame schema from a list of dictionaries. The ability is Pythonic and matches functionality offered by pandas.

This change clarifies to users that this behavior is supported and is not going away in the near future.

[API][SPARK-31000][PYTHON][SQL] Add ability to set table description via Catalog.createTable() (+149, -6)>

New API: This PR enhances Catalog.createTable() to allow users to set the table's description. This corresponds to the following SQL syntax:

COMMENT 'this is a fancy table';

This brings the Scala/Python catalog APIs a bit closer to what's already possible via SQL.


[SPARK-32467][UI] Avoid encoding URL twice on https redirect (+44, -6)>

This PR fixes a UI issue when HTTPS is enabled. When HTTPS is enabled for Spark UI, an HTTP request will be redirected as an encoded HTTPS URL: https://github.com/apache/spark/pull/10238/files#diff-f79a5ead735b3d0b34b6b94486918e1cR312

When we create the redirect URL, we will call getRequestURI and getQueryString. Both two methods may return an encoded string. However, we pass them directly to the following URI constructor

URI(String scheme, String authority, String path, String query, String fragment)

As this URI constructor assumes both path and query parameters are decoded strings, it will encode them again. This makes the redirect URL encoded twice.

[API][SPARK-23431][CORE] Expose stage level peak executor metrics via REST API (+1558, -53)>

This PR proposes to expose the peak executor metrics at the stage level via the REST APIs:

  • /applications/<application_id>/stages/: peak values of executor metrics for each stage
  • /applications/<application_id>/stages/<stage_id>/< stage_attempt_id >: peak values of executor metrics for each executor for the stage, followed by peak values of executor metrics for the stage

Exposing the stage level peak executor metrics can help better understand your application's resource utilization.


[SPARK-32490][BUILD] Upgrade netty-all to 4.1.51.Final (+4, -4)>

This PR brings the bug fixes from the latest netty version from 4.1.47.Final.


[SPARK-32543][R] Remove arrow::as_tibble usage in SparkR (+2, -18)>

This PR removed arrow::as_tibble usage in SparkR since SparkR has increased the minimal version of Arrow R version to 1.0.0(SPARK-32452), and Arrow R 0.14 dropped as_tibble.

[API][SPARK-32554][K8S][DOCS] Remove the words "experimental" in the k8s document (+0, -4)>

This PR removed the words "experimental" in the k8s document from the primary branch in order to prepare a GA announcement for the k8s scheduler in the next feature release (v3.1.0).

[SPARK-32357][INFRA] Publish failed and succeeded test reports in GitHub Actions (+52, -11)>

This PR proposes to report the failed and succeeded tests in GitHub Actions in order to improve the development velocity by leveraging ScaCap/action-surefire-report.

[SPARK-32647][INFRA] Report SparkR test results with JUnit reporter (+8, -5)>

This PR proposes to generate a JUnit XML test report in SparkR tests that can be leveraged in both Jenkins and GitHub Actions.

[SPARK-32645][INFRA] Upload unit-tests.log as an artifact (+6, -0)>

Infra improvement: This PR proposes to upload target/unit-tests.log into the artifact so it will be able to download here: Screen Shot 2020-08-18 at 2 23 18 PM

Jenkins has this feature. It should be best to have the same dev functionalities with it.

[SPARK-32655][K8S] Support appId/execId placeholder in K8s SPARK_EXECUTOR_DIRS (+16, -0)>

Kubernetes improvement: This PR aims to support replacements of SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID in SPARK_EXECUTOR_DIRS executor environment.

[SPARK-32682][INFRA] Use workflow_dispatch to enable manual test triggers (+16, -1)>

Infra improvement: Reduce the pressure of GitHub Actions on the Spark repository.

Add a workflow_dispatch entry in the GitHub Action script (build_and_test.yml). This update can enable developers to run the Spark tests for a specific branch on their own local repository, so I think it might help to check if all the tests can pass before opening a new PR.

Screen Shot 2020-08-21 at 16 28 41