[OSS DIGEST] The major changes of Apache Spark from June 17 to June 30

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[OSS DIGEST] The major changes of Apache Spark from June 17 to June 30

Hyukjin Kwon

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the title.


CORE

[3.1][SPARK-31960][YARN][BUILD] Only populate Hadoop classpath for no-hadoop build (+56, -4)>

Previously, it propagated Hadoop class path from yarn.application.classpath and mapreduce.application.classpath when a job is submitted to Yarn by default. Now, it does not propagate when the Spark distribution is with the built-in Hadoop by default in order to prevent the failure from the different transitive dependencies picked up from the Hadoop cluster such as Guava and Jackson.

[API][3.1][SPARK-31798][SHUFFLE] Shuffle Writer API changes to return custom map output metadata (+113, -11)>

Introduces MapOutputMetadata on the shuffle writer side for now. Accepting the metadata on the driver and persisting it in the driver's shuffle metadata storage plugin will be done soon in order for custom map output writers to return metadata to the driver. The metadata will contain the information such the block location so the readers are able to read it. This is the part of a SPIP, Use remote storage for persisting shuffle data

SQL

[API][3.1][SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector (+165, -3)>

When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it. Now, it works with MS SQL through JDBC datasource.

[3.1][SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array type (+106, -17)>

Previously, the generated codes for concat_ws could lead to a compilation failure due to missing relevant local variables when splitting methods is triggered in splitExpressionsWithCurrentInputs. Now, the bug was fixed by passing the local relevant variables internally.

[API][3.1][SPARK-31826][SQL] Support composed type of case class for typed Scala UDF (+210, -32)>

Adds the support for typed Scala UDF to accept composed type of case class, for example, Seq[T]Array[T]Map[Int, T] where T is the case class.

case class Person(name: String, age: Int)
Seq((1, Seq(Person("Jack", 5)))).toDF("id", "persons").withColumn("ages", udf{ s: Seq[Person] => s.head.age }.apply(col("persons"))).show

Before:

...
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
  at $anonfun$res3$1(<console>:30)
...

After:

+---+-----------+----+
| id|    persons|ages|
+---+-----------+----+
|  1|[[Jack, 5]]| [5]|
+---+-----------+----+

[API][3.1][SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable (+523, -13)>

Introduces two configurations, spark.sql.bucketing.coalesceBucketsInJoin.enabled and spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio, to allow coalescing buckets when the following conditions are met to eliminate the full shuffle:

  • Join is the sort merge one (which is created only for equi-join).
  • Join keys match with output partition expressions on their respective sides.
  • The larger bucket number is divisible by the smaller bucket number.
  • spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled is set to true.
  • The ratio of the number of buckets should be less than the value set in spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio.

For example,

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

Before:

== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
:     +- *(1) Project [i#44, j#45, k#46]
:        +- *(1) Filter isnotnull(i#44)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
      +- *(3) Project [i#50, j#51, k#52]
         +- *(3) Filter isnotnull(i#50)
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4

After:

== Physical Plan ==
*(3) SortMergeJoin [i#44], [i#50], Inner
:- *(1) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#44, j#45, k#46]
:     +- *(1) Filter isnotnull(i#44)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8 (Coalesced to 4)
+- *(2) Sort [i#50 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#50, j#51, k#52]
      +- *(2) Filter isnotnull(i#50)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4

Note, the configuration names in the commit got updated by the follow-up PR https://github.com/apache/spark/pull/29079

[API][2.4][SPARK-31980][SQL] Function sequence() fails if start and end of range are equal dates (+20, -2)>

Previously, the same date/timestamp start and end in sequence raised an error. Now, it is supported.

sql("""SELECT sequence(cast("2011-03-01" AS DATE), cast("2011-03-01" AS DATE), INTERVAL 1 MONTH)""").show()

Before:

java.lang.ArrayIndexOutOfBoundsException: 1java.lang.ArrayIndexOutOfBoundsException: 1 at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:92) at 
...

After:

+---------------------------------------------------------------------------------+
|sequence(CAST(2011-03-01 AS DATE), CAST(2011-03-01 AS DATE), INTERVAL '1 months')|
+---------------------------------------------------------------------------------+
|                                                                     [2011-03-01]|
+---------------------------------------------------------------------------------+

[API][3.0][SPARK-32021][SQL] Increase precision of seconds and fractions of make_interval (+53, -9)>

Previously, the precision of seconds and its fraction was 8 in make_internal which does not support the max allowed microseconds which is a long. Now, it is 18 and supports microseconds properly.

sql("SELECT make_interval(1, 2, 3, 4, 0, 0, 123456789012.123456)").show()

Before:

+--------------------------------------------------------------------------+
|make_interval(1, 2, 3, 4, 0, 0, CAST(123456789012.123456 AS DECIMAL(8,6)))|
+--------------------------------------------------------------------------+
|                                                                      null|
+--------------------------------------------------------------------------+

After:

+---------------------------------------------------------------------------+
|make_interval(1, 2, 3, 4, 0, 0, CAST(123456789012.123456 AS DECIMAL(18,6)))|
+---------------------------------------------------------------------------+
|       1 years 2 months 25 days 34293552 hours 30 minutes 12.123456 seconds|
+---------------------------------------------------------------------------+

[3.1][SPARK-31957][SQL] Cleanup hive scratch dir for the developer api startWithContext (+25, -13)>

Previously, Hive scratch directory was not removed when it starts the thrift server via HiveThriftServer2.startWithContext API whereas it cleans the directory when we start it in a regular way via HiveThriftServer2.main. Now it consistently cleans up the directory when you starts it with HiveThriftServer2.startWithContext.

[API][3.1][SPARK-32019][SQL] Add spark.sql.files.minPartitionNum config (+49, -2)>

  • spark.sql.files.minPartitionNum (Default: None)

    The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is spark.default.parallelism. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

[2.4][SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown (+48, -16)>

Ports HIVE-14817 for Spark thrift server. Previously, when stopping the HiveServer2, the non-daemon thread could stop the server from terminating with the error message below:

"HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178)
	at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Here is an example to reproduce. Now this is fixed.

[3.0][SPARK-32038][SQL] NormalizeFloatingNumbers should also work on distinct aggregate (+38, -12)>

Previously, the different NaNs were treated as different distinct values, see SPARK-26448. Now, it is fixed and the behavior is matched to Spark 2.4.6 where different NaNs are all treated the same. See the example below.

case class Test( uid:String, score:Float)
val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f800001)
val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fffffff)
val rows = Seq(
 Test("mithunr",  Float.NaN),	
 Test("mithunr",  POS_NAN_1),
 Test("mithunr",  POS_NAN_2),
 Test("abellina", 1.0f),
 Test("abellina", 2.0f)
).toDF.createOrReplaceTempView("mytable")
spark.sql(" select uid, count(distinct score) from mytable group by 1 order by 1 asc ").show

Before:

+--------+---------------------+
|     uid|count(DISTINCT score)|
+--------+---------------------+
|abellina|                    2|
| mithunr|                    3|
+--------+---------------------+

After:

+--------+---------------------+
|     uid|count(DISTINCT score)|
+--------+---------------------+
|abellina|                    2|
| mithunr|                    1|
+--------+---------------------+

[API][3.1][SPARK-32025][SQL] CSV schema inference problems with different types in the same column (+20, -9)>

Before the fix, automatic type inference in CSV data source depends on the order of rows. The inferred type might not be compatible with the actual data so that the returned data are converted to NULL. This PR improves the type inference and uses the greatest compatible type as the inferred type. Below is an example.

Previously:

$ cat /example/f1.csv
col1
43200000
true

spark.read.csv(path="file:///example/*.csv", header=True, inferSchema=True).show()
+----+
|col1|
+----+
|null|
|true|
+----+

root
 |-- col1: boolean (nullable = true)

Now

spark.read.csv(path="file:///example/*.csv", header=True, inferSchema=True).show()
+-------------+
|col1         |
+-------------+
|43200000     |
|true         |
+-------------+

root                                                                            
 |-- col1: string (nullable = true)

[2.4][SPARK-32115][SQL] Fix SUBSTRING to handle integer overflows (+18, -1)>

This PR is to fix the overflow issue in the internal function UTF8String.substringSQL.

For example, for query SUBSTRING("abc", -100, -100), we'll get the right output of "". However, before the fix, the query SELECT SUBSTRING("abc", -1207959552, -1207959552) incorrectly returns "abc". The expected output should be "".

[API][3.1][SPARK-32030][SQL] Support unlimited MATCHED and NOT MATCHED clauses in MERGE INTO (+59, -71)>

This PR adds unlimited MATCHED and NOT MATCHED clauses in MERGE INTO statements.

Now the MERGE INTO syntax is,

MERGE INTO [db_name.]target_table [AS target_alias]
 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]

[API][3.1][SPARK-31336][SQL] Support Oracle Kerberos login in JDBC connector (+99, -0)>

When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it. This PR enables users to connect to Oracle using kerberos.

ML

[3.1][SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator (+308, -25)>

A new foldCol param is added to CrossValidator which supports user-specified fold column in CrossValidator. User can assign fold numbers to dataset instead of letting Spark do random splits.

[3.1][SPARK-31893][ML] Add a generic ClassificationSummary trait (+543, -466)>

Add a generic ClassificationSummary trait so all the classification models can use it to implement summary.

[API][3.1][SPARK-19939][ML] Add support for association rules in ML (+57, -26)>

This PR is to add the Support to Association Rules in Spark ml.fpm. The Support is an indication of how frequently the itemset of an association rule appears in the database and suggests if the rules are generally applicable to the dateset. Refer to wiki for more details.

[API][3.1][SPARK-20249][ML][PYSPARK] Add training summary for LinearSVCModel (+257, -7)>

This PR is to add training summary for LinearSVCModel.

PYTHON

[2.4][SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow (+7, -1)>

When you use floats as index of pandas, it creates a Spark DataFrame with a wrong results as below when Arrow is enabled:

./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show()
+---+
|  a|
+---+
|  1|
|  1|
|  2|
+---+

This is because direct slicing uses the value as index when the index contains floats:

>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
     a
2.0  1
3.0  2
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
     a
4.0  3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
   a
4  3

This PR proposes to explicitly use iloc to positionally slide when we create a DataFrame from a pandas DataFrame with Arrow enabled.

[3.0][SPARK-32011][PYTHON][CORE] Remove warnings about pin-thread modes and guide to use collectWithJobGroup (+6, -58)>

Previously, it warned to use the new thread-pin mode when you set the thread local in PySpark. However, there was a critical leak issue found at SPARK-32010. Now, it guides users to use RDD.collectWithJobGroup for now.

SS

[API][3.1][SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store (+228, -11)>

Previously, Structured Streaming directly put the checkpoint (represented in UnsafeRow) into StateStore without any schema validation. It's a dangerous behavior when users reusing the checkpoint file during migration. Any change or bug fix related to the aggregate function may cause random exceptions, even the wrong answer, e.g SPARK-28067. Now it validates the checkpoint against the schema and throws InvalidUnsafeRowException when the checkpoint is reused during migration.

  • spark.sql.streaming.stateStore.formatValidation.enabled (Default: true)

    When true, check if the checkpoint from the state store is valid or not when running streaming queries. This can happen if the state store format has been changed. Note, the feature is only effective in the build-in HDFS state store provider now.

[API][3.1][SPARK-32033][SS][DSTEAMS] Use new poll API in Kafka connector executor side to avoid infinite wait (+7, -4)>

Previously, Kafka source used an old and deprecated API named KafkaConsumer.poll(long) which never returns and stays in live lock if metadata is not updated (for instance when broker disappears at consumer creation), see also Kafka documentation and standalone test application. Now it uses KafkaConsumer.poll(Duration) to prevent unexpected hanging.

DEPS

[3.1][SPARK-32045][BUILD] Upgrade to Apache Commons Lang 3.10 (+4, -4)>

Upgrade to Apache Commons Lang from 3.9 to 3.10.

UI

[3.0][SPARK-32028][WEBUI] Fix app id link for multi attempts app in history summary page (+6, -5)>

Before this patch, the APP id link uses the application attempt count as the attempt id. This PR is to fix the app id link for multi-attempt application in the history summary page:

  • If the attempt id is available (e.g. in yarn mode), the app id link url will contain the correct attempt id, like /history/application_1561589317410_0002/1/jobs/.
  • If the attempt id is not available (e.g., in standalone mode), the app id link url will not contain the fake attempt id, like /history/app-20190404053606-0000/jobs/.

OTHER

[2.4][SPARK-31918][R] Ignore S4 generic methods under SparkR namespace in closure cleaning to support R 4.0.0+ (+18, -13)>

This PR proposes to exclude the S4 generic methods under SparkR namespace in closure cleaning to support R 4.0.0+ in SparkR. Without this patch, you will hit the following exception when running R native codes with R 4.0.0

df <- createDataFrame(lapply(seq(100), function (e) list(value=e)))
count(dapply(df, function(x) as.data.frame(x[x$value < 50,]), schema(df)))
org.apache.spark.SparkException: R unexpectedly exited.
R worker produced errors: Error in lapply(part, FUN) : attempt to bind a variable to R_UnboundValue

[2.4][SPARK-32073][R] Drop R < 3.5 support (+4, -13)>

[3.1][SPARK-32058][BUILD] Use Apache Hadoop 3.2.0 dependency by default (+15, -15)>

This PR aims to switch the default Apache Hadoop dependency from 2.7.4 to 3.2.0 in the upcoming Apache Spark 3.1.0. We will still provide the options for end users to choose Spark on Hadoop 2.7.4 in Spark download website.

ItemDefault Hadoop Dependency
Apache Spark Website3.2.0
Apache Download Site3.2.0
Apache Snapshot3.2.0
Maven Central3.2.0
PyPI2.7.4
CRAN2.7.4
Homebrew3.2.0 (already)