[OSS DIGEST] The major changes of Apache Spark from Apr 22 to May 5

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[OSS DIGEST] The major changes of Apache Spark from Apr 22 to May 5

Kris Mo
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an [API] tag in the title.


[2.4][SPARK-31485][CORE] Avoid application hang if only partial barrier tasks launched (+53, -19)>

The application could hang when the partial barrier tasks are launched. For example,

val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>

The original approach assumes that the exception thrown in TaskSchedulerImpl.resourceOffers can fail the application and the barrier stage isn't really executed. However, the resourceOffers function is within the scope of Spark RPC framework, which swallows any non fatal exceptions. This breaks the assumption. This PR fixes the bug by using dagScheduler.taskSetFailed to abort a barrier stage.

[API][3.1][SPARK-31518][CORE] Expose filterByRange in JavaPairRDD (+57, -0)>

To improve the consistency between the Scala and Java APIs, this PR exposes the filterByRange method from OrderedRDDFunctions in the Java API (as a method of JavaPairRDD).

   * Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
   * If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
   * performed efficiently by only scanning the partitions that might containt matching elements.
   * Otherwise, a standard `filter` is applied to all partitions.
   * @since 3.1.0
  def filterByRange(lower: K, upper: K): JavaPairRDD[K, V]

   * Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
   * If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
   * performed efficiently by only scanning the partitions that might containt matching elements.
   * Otherwise, a standard `filter` is applied to all partitions.
   * @since 3.1.0
  def filterByRange(comp: Comparator[K], lower: K, upper: K): JavaPairRDD[K, V] 

[API][3.0][SPARK-31619][CORE] Rename config spark.dynamicAllocation.shuffleTimeout to spark.dynamicAllocation.shuffleTracking.timeout (+13, -13)>

Rename the config spark.dynamicAllocation.shuffleTimeout (which has not been released yet) to spark.dynamicAllocation.shuffleTracking.timeout

spark.dynamicAllocation.shuffleTracking.timeout (Default: Long.MaxValue)

  • When shuffle tracking is enabled, controls the timeout for executors that are holding shuffle data. The default value means that Spark will rely on the shuffles being garbage collected to be able to release executors. If for some reason garbage collection is not cleaning up shuffles quickly enough, this option can be used to control when to time out executors even when they are storing shuffle data.

[API][2.4][SPARK-31582][YARN] Being able to not populate Hadoop classpath (+43, -1)>

Add a new Spark YARN config spark.yarn.populateHadoopClasspath, to avoid jar conflicts when running a Spark distribution with its own embedded Hadoop on an external YARN cluster.

spark.yarn.populateHadoopClasspath (Default: True)

  • Whether to populate Hadoop classpath from yarn.application.classpath and mapreduce.application.classpath. Note that if this is set to false, it requires a with-Hadoop Spark distribution that bundles Hadoop runtime or user has to provide a Hadoop installation separately.

[API][3.1][SPARK-31235][YARN] Separates different categories of applications (+97, -4)>

Add a new config spark.yarn.applicationType to identify the application type when submitted to a YARN cluster.

spark.yarn.applicationType (Default: SPARK)

  • Defines more specific application types, e.g. SPARKSPARK-SQLSPARK-STREAMINGSPARK-MLLIB and SPARK-GRAPH. Please be careful not to exceed 20 characters.


[API][2.4][SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession (+67, -10)>

This PR fixes a long-standing bug for static SQL configurations. In the original approach, SparkSessionBuilder propagated the static SQL configurations to the existing active/default SparkSession. It breaks the semantics of the static configs, whose values cannot be changed at runtime. For example, before the fix, users can still modify it by using a new session to change the static conf.

scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
  at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
  at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)


scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6403d574

scala> spark.sql("set spark.sql.warehouse.dir").show
|                 key|value|
|spark.sql.warehou...|  xyz|

[3.0][SPARK-31495][SQL] Support formatted explain for AQE (+981, -37)>

This PR adds the support of formatted explain for AQE. For example,


== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)

(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession@104ab57b), [PlanAdaptiveSubqueries(Map())], false

After the support:

== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)

 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 ... [skipped in this digest]

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true

[API][3.0][SPARK-31507][SQL] Remove uncommon fields support and update some fields with meaningful names for extract function (+576, -1806)>

This PR deletes the support for extracting millennium, century, decade, millisecond, microsecond, and epoch from datetime, since they are neither the ANSI standard nor common in other SQL platforms. It also updates the string values of field for dates and timestamps in the EXTRACT function:

  • Use DOW_ISO and DAYOFWEEK_ISO to replace ISODOW

This change is made on the unreleased features in Spark 3.0.

[API][3.0][SPARK-31528][SQL] Remove millennium, century, decade from trunc/date_trunc fucntions (+79, -101)>

Similar to SPARK-31507, this PR deletes these fmts support for trunc and date_trunc functions. This change is made on the unreleased features in Spark 3.0.

[2.4][SPARK-31519][SQL] Cast in having aggregate expressions returns the wrong result (+135, -80)>

Before the fix, the following query returns an empty result:

SELECT SUM(a) AS b, CAST('2020-01-01' AS DATE) AS fake FROM VALUES (1, 10), (2, 20) AS T(a, b) GROUP BY b HAVING b > 10

Before the fix, the SQL parser creates Filter(..., Aggregate(...)) when parsing the HAVING clause, and the analyzer rule ResolveAggregateFunctions resolves the aggregate functions and grouping columns in the Filter operator.

It works for simple cases in a very tricky way as it relies on the execution order of analyzer rules: Step 1. Rule ResolveReferences handles the unresolved Aggregate operator and resolves the attributes inside aggregate functions, but the function itself is still unresolved as it's an UnresolvedFunction. This stops resolving the Filter operator as the child Aggregate operator is still unresolved. Step 2. Rule ResolveFunctions resolves UnresolvedFunction. This makes the Aggregate operator resolved. Step 3. Rule ResolveAggregateFunctions resolves the Filter operator if its child is a resolved Aggregate. This rule can correctly resolve the grouping columns.

In the example query, the CAST operation, which needs to be resolved by rule ResolveTimeZone, runs after ResolveAggregateFunctions. This breaks Step 3 as the Aggregate operator is unresolved at that time. The analyzer starts the next round and the Filter operator is resolved by ResolveReferences, which wrongly resolves the grouping columns.

In this PR, we fix this bug by adding a new logical node AggregateWithHaving. The analyzer resolves it to Filter(..., Aggregate(...)).

[API][3.0][SPARK-31522][SQL] Hive metastore client initialization related configurations should be static (+15, -6)>

This PR changes the following SQL Configs to be the static:

  1. spark.sql.hive.metastore.version - used to determine the Hive version in Spark
  2. spark.sql.hive.metastore.jars - The location of Hive metastore related jars which are used by Spark to create the hive client
  3. spark.sql.hive.metastore.sharedPrefixes and spark.sql.hive.metastore.barrierPrefixes - the package names of classes that are shared or separated between SparkContextLoader and Hive client class loader

[API][3.0][SPARK-31527][SQL] date add/subtract interval only allow those day precision in ansi mode (+888, -21)>

To follow the ANSI standard,this PR makes the expressions of date + intervalinterval + date and date - interval only accept intervals in which the microseconds part is 0, under the ANSI mode.

[API][3.1][SPARK-30724][SQL] Support 'LIKE ANY' and 'LIKE ALL' operators (+405, -10)>

This PR adds the support for LIKE ANY/SOME and LIKE ALL operators, which are mostly used when we are matching a text field with one or multiple patterns.

[3.1][SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector (+281, -33)>

When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This PR adds the DB2 support.

[3.1][SPARK-31524][SQL] Add metric to the split task number for skew optimization (+20, -7)>

This PR adds the metric info of the split task number for the skewed optimization. With this PR, we can see the number of splits for the skewed partitions as follows:


[3.1][SPARK-31586][SQL] Replace expression TimeSub(l, r) with TimeAdd(l -r) (+45, -91)>

The implementation of TimeSub for the operation of timestamp subtracting interval is almost repetitive with TimeAdd. In this PR, we replace it with TimeAdd(l, -r) since they are equivalent.

[2.4][SPARK-31500][SQL] collect_set() of BinaryType returns duplicate elements (+55, -6)>

Fix a bug that collect_set() of BinaryType could return duplicated elements previously.

[API][3.0][SPARK-30282][SQL][FOLLOWUP] SHOW TBLPROPERTIES should support views (+162, -29)>

Fix a Spark 3.0 regression that SHOW TBLPROPERTIES does not work well for views. This PR adds back the view support for SHOW TBLPROPERTIES.


scala> sql("CREATE VIEW view TBLPROPERTIES('p1'='v1', 'p2'='v2') AS SELECT 1 AS c1")
scala> sql("SHOW TBLPROPERTIES view").show(truncate=false)
|key                              |value        |
|view.catalogAndNamespace.numParts|2            |
|view.query.out.col.0             |c1           |
|view.query.out.numCols           |1            |
|p2                               |v2           |
|view.catalogAndNamespace.part.0  |spark_catalog|
|p1                               |v1           |
|view.catalogAndNamespace.part.1  |default      |

scala> sql("CREATE TEMPORARY VIEW tview TBLPROPERTIES('p1'='v1', 'p2'='v2') AS SELECT 1 AS c1")
scala> sql("SHOW TBLPROPERTIES tview").show(truncate=false)

[API][3.0][SPARK-31557][SQL] Fix timestamps rebasing in legacy parsers (+40, -8)>

The PR fixes two legacy timestamp formatter LegacySimpleTimestampFormatter and LegacyFastTimestampFormatter to perform micros rebasing in parsing/formatting from/to strings. The legacy timestamps formatters operate on the hybrid calendar (Julian + Gregorian), so, the input micros should be rebased to have the same date-time fields as in Proleptic Gregorian calendar used by Spark SQL.

[API][3.0][SPARK-31597][SQL] extracting day from intervals should be interval.days + days in interval.microsecond (+22, -19)>

When extracting day from intervals, we should add the exceeded days in interval time part to the total days of the operation which extracts day from interval values. The new behavior satisfies both the ANSI standard and common use cases in modern SQL platforms.


spark-sql> SELECT EXTRACT(DAY FROM (cast('2020-01-15 00:00:00' as timestamp) - cast('2020-01-01 00:00:01' as timestamp)));
spark-sql> SELECT EXTRACT(DAY FROM (cast('2020-01-15 00:00:00' as timestamp) - cast('2020-01-01 00:00:00' as timestamp)));

[3.0][SPARK-31606][SQL] Reduce the perf regression of vectorized parquet reader caused by Datetime rebase (+295, -160)>

The recently added Datetime rebase degrades the performance of Parquet vectorized reader, as it breaks vectorization, even if the Datetime values don't need to rebase. The PR pushes the rebase logic to the lower level of the Parquet vectorized reader, to make the code more vectorization-friendly. According to the benchmark:

  • The Date type is 30% faster if the values don't need to rebase, 20% faster if need to rebase.
  • The Timestamp type is 60% faster if the values don't need to rebase, no difference if need to rebase.

[API][3.0][SPARK-31626][SQL] Port HIVE-10415: hive.start.cleanup.scratchdir configuration is not taking effect (+36, -0)>

The PR makes hive.start.cleanup.scratchdir effective, to cleanup the Hive scratchdir when starting the Hive ThriftServer.

[3.0][SPARK-31630][SQL] Fix perf regression by skipping timestamps rebasing after some threshold (+431, -410)>

The PR skips timestamps rebasing after a global threshold when there is no difference between Julian and Gregorian calendars. This allows to avoid checking hash maps of switch points, and fixes perf regressions in toJavaTimestamp() and fromJavaTimestamp(). According to the benchmark:

  • Conversions from external type java.sql.Timestamp is ~ 34% faster.
  • Conversions to external type java.sql.Timestamps is ~16% faster.

[3.0][SPARK-31641][SQL] Fix days conversions by JSON legacy parser (+9, -5)>

The PR fixes the days conversion bug by the JSON legacy parser. This bug could produce wrong results. In Spark 2.4 and earlier versions, the days are interpreted as days since the epoch in the hybrid calendar (Julian + Gregorian since 1582-10-15). Since Spark 3.0, the base calendar was switched to Proleptic Gregorian calendar, so the days should be rebased to represent the same local date.

[API][3.1][SPARK-31594][SQL] Do not display the seed of rand/randn with no argument in output schema (+52, -6)>

This PR updates the column name of the Spark SQL rand()/randn() function. Previously, it would include the random seed in the column name, after the change it hides the seed unless the seed is explicitly given.


scala> sql("select rand()").show()
|            rand()|

// If a seed given, it is still shown in a column name
// (the same with the current behavior)
scala> sql("select rand(1)").show()
|           rand(1)|

// We can still check a seed in explain output:
scala> sql("select rand()").explain()
== Physical Plan ==
*(1) Project [rand(-2282124938778456838) AS rand()#0]
+- *(1) Scan OneRowRelation[]

[3.0][SPARK-31607][SQL] Improve the perf of CTESubstitution (+29, -21)>

Previously we will traverse the main query for many times if there are many CTE relations in the rule CTESubstitution. The PR makes a change to resolve CTE relations first, thus we only need to traverse the main plan for once to substitute CTE relations.

[API][3.0][SPARK-31639] Revert SPARK-27528 Use Parquet logical type TIMESTAMP_MICROS by default (+4, -10)>

The PR reverts the commit that sets TIMESTAMP_MICROS as the timestamp type while saving timestamps to Parquet files. The change is needed to be compatible with HIVE and Presto that don't support the TIMESTAMP_MICROS type in the current stable releases.


[3.0][SPARK-31497][ML][PYSPARK] Fix PySpark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model (+189, -3)>

Fix PySpark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model.

Most PySpark estimators/transformers inherit JavaParams, but some estimators are special (in order to support pure python implemented nested estimators/transformers):

  • Pipeline
  • OneVsRest
  • CrossValidator
  • TrainValidationSplit

But note that, currently, in PySpark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer.

PySpark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the _transfer_param_map_to_java and _transfer_param_map_from_java (used in model read/write).

OneVsRest class already defines the two methods, but Pipeline does not, so it lead to this bug.

This PR adds _transfer_param_map_to_java and _transfer_param_map_from_java into Pipeline class.

[3.1][SPARK-31007][ML] KMeans optimization based on triangle-inequality (+390, -45)>

This PR applies Lemma 1 in Using the Triangle Inequality to Accelerate K-Means:

Let x be a point, and let b and c be centers. If d(b,c)>=2d(x,b) then d(x,c) >= d(x,b);

It can be directly applied in EuclideanDistance, but not in CosineDistance. However, for CosineDistance we can luckily get a variant in the space of radian/angle.

[3.1][SPARK-31603][ML] AFT uses common functions in RDDLossFunction (+173, -226)>

Make AFT reuse the common functions in ml.optim, since the logic in optimizing AFT is quite similar to other algorithms based on RDDLossFunction.


[3.0][SPARK-27340][SS] Alias on TimeWindow expression cause watermark metadata lost (+38, -13)>

This PR fixes the bug by avoiding to set explicitMetadata in the API Column.name. The root cause of this bug is the original approach of Column.as didn't rightly propagate the metadata:

def name(alias: String): Column = withExpr { 
   normalizedExpr() match { 
     case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) 
     case other => Alias(other, alias)() 

In Structured Streaming, we added an Alias for TimeWindow by default. When user put another Alias, this function will be entered twice. The first time is the internal Alias, which goes in the branch without setting explicitMetadata, for the second time, the explicitMetadata will be set to None since the metadata for TimeWindow column is only set after the analysis rule TimeWindowing.


[3.0][SPARK-29664][PYTHON][SQL][FOLLOW-UP] Add deprecation warnings for getItem instead (+18, -21)>

This PR proposes to use a different approach instead of breaking it per the rubric added at https://spark.apache.org/versioning-policy.html. It deprecates the behavior for now. It will be gradually removed in the future releases.

After this change,

import warnings
from pyspark.sql.functions import *
df = spark.range(2)
map_col = create_map(lit(0), lit(100), lit(1), lit(200))
df.withColumn("mapped", map_col.getItem(col('id'))).show()
/.../python/pyspark/sql/column.py:311: DeprecationWarning: A column as 'key' in getItem is
deprecated as of Spark 3.0, and will not be supported in the future release. Use `column[key]`
or `column.key` syntax instead.
import warnings
from pyspark.sql.functions import *
df = spark.range(2)
struct_col = struct(lit(0), lit(100), lit(1), lit(200))
df.withColumn("struct", struct_col.getField(lit("col1"))).show()
/.../spark/python/pyspark/sql/column.py:336: DeprecationWarning: A column as 'name'
in getField is deprecated as of Spark 3.0, and will not be supported in the future release. Use
`column[name]` or `column.name` syntax instead.

[API][3.1][SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and tests (+791, -27)>

As part of the Stage level scheduling features, add the Python API's to set the resource profiles. This also adds the functionality to properly apply the PySpark memory configuration when specified in the ResourceProfile. The PySpark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it.

[API][3.0][SPARK-31549][PYSPARK] Add a develop API invoking collect on Python RDD with user-specified job group (+90, -0)>

The PR adds a new API in PySpark RDD class: def collectWithJobGroup(self, groupId, description, interruptOnCancel=False). This new API does the same thing with rdd.collect(), but it can specify the job group when collecting. Previously if we specify the job group before rdd.collect(), the job group will not be set as expected due to the Java local thread variable issues.

    def collectWithJobGroup(self, groupId, description, interruptOnCancel=False):
        .. note:: Experimental
        When collect rdd, use this method to specify job group.
        .. versionadded:: 3.0.0


[3.0][SPARK-31534][WEBUI] Text for tooltip should be escaped (+51, -30)>

This PR fixes the bug by escaping text for tooltip for DAG Viz and Timeline View.


dag-viz-tooltip-before-fixed timeline-tooltip-before-fixed


dag-viz-tooltip-fixed timeline-tooltip-fixed


[3.0][SPARK-31580][BUILD] Upgrade Apache ORC to 1.5.10 (+23, -10)>

This PR aims to upgrade Apache ORC to 1.5.10.

Apache ORC 1.5.10 is a maintenance release with the following patches.

  • ORC-621 Need reader fix for ORC-569
  • ORC-616 In Patched Base encoding, the value of headerThirdByte goes beyond the range of byte
  • ORC-613 OrcMapredRecordReader mis-reuse struct object when actual children schema differs
  • ORC-610 Updated Copyright year in the NOTICE file

The following is the release note.

[3.0][SPARK-31633][BUILD] Upgrade SLF4J from 1.7.16 to 1.7.30 (+13, -13)>

SLF4J 1.7.23+ is required to enable slf4j-log4j12 with MDC feature to run under Java 9. Also, this will bring all latest bug fixes.

[3.1][MINOR][INFRA] Add a guide to clarify release/unreleased Spark versions of user-facing change in the Github PR template (+3, -1)>

Add a guide to clarify the Spark version when describing "Does this PR introduce any user-facing change?".


Kris Mok

Software Engineer Databricks Inc.

[hidden email]