[OSS DIGEST] The major changes of Apache Spark from Mar 11 to Mar 24

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[OSS DIGEST] The major changes of Apache Spark from Mar 11 to Mar 24

Maryann Xue
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an [API] tag in the title.

CORE

[API][3.0][SPARK-30667][CORE] Change BarrierTaskContext allGather method return type (+4, -4)>

This PR changes the return type of the BarrierTaskContext.allGather method to Array[String] instead of ArrayBuffer[String] since it should be immutable.

SQL

[2.4][SPARK-31163][SQL] TruncateTableCommand with acl/permission should handle non-existent path (+22, -1)>

SPARK-30312 added the feature to preserve path permission & ACL when truncating table. Before SPARK-30312, the Truncate Table command can be successfully executed even if the (partition/table) path doesn't exist. However, after SPARK-30312, the Truncate Table command will fail if the path doesn't exist. This PR fixed the behavior change by correctly handling non-existent path.

[3.0][SPARK-31030][SQL] Backward Compatibility for Parsing and Formatting Datetime (+341, -66)>

In Spark 2.4 and earlier, datetime parsing, formatting and conversion are performed by using the hybrid calendar (Julian + Gregorian). Spark 3.0 has switched to Proleptic Gregorian calendar as it is the de-facto calendar. The implementation depends on Java 8 APIs. However, the calendar switch causes the breaking changes. Some patterns are not compatible between Java 8 and Java 7 time APIs. Therefore, this PR introduces Spark's own pattern definition rather than depends on the pattern of Java time API. For keeping the backward compatibility, this PR shadows the incompatible letters.

[API][3.0][SPARK-30958][SQL] Do not set default era for DateTimeFormatter (+53, -6)>

By default, Spark uses "uuuu" as the year pattern, which indicates the era already. If we set a default era, it can get conflicted and fail the parsing. Besides, we replace "y" with "u" if there is no "G". So the era is either explicitly specified (e.g. "yyyy G") or can be inferred from the year (e.g. "uuuu").

After the change, Spark now can parse date/timestamp with negative year via the "yyyy" pattern, which will be converted to "uuuu" under the hood.

[3.0][SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp via local date-time (+102, -15)>

This PR changes conversion of java.sql.Timestamp/Date values to/from internal values of Catalyst's TimestampType/DateType before cutover day 1582-10-15 of Gregorian calendar. It constructs the local date-time from microseconds/days since the epoch. Take each date-time component yearmonthdayhourminutesecond and second fraction, and construct java.sql.Timestamp/Date using the extracted components.

Before this change:

scala> sql("select date '1100-10-10'").collect()
res0: Array[org.apache.spark.sql.Row] = Array([1100-10-03])

After the changes:

scala> sql("select date '1100-10-10'").collect()
res1: Array[org.apache.spark.sql.Row] = Array([1100-10-10])

[3.0][SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter (+50, -2)>

This PR added the case-sensitive parameter to ParquetRowConverter so that it could handle the materialized parquet properly with respect to case sensitivity. Otherwise, the following example will throw IllegalArgumentException in the case-insensitive mode.

val path = "/some/temp/path"

spark
  .range(1L)
  .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
  .write.parquet(path)

val caseInsensitiveSchema = new StructType()
  .add(
    "StructColumn",
    new StructType()
      .add("LowerCase", LongType)
      .add("camelcase", LongType))

spark.read.schema(caseInsensitiveSchema).parquet(path).show()

[3.0][SPARK-31124][SQL] Change the default value of minPartitionNum in AQE (+20, -10)>

Adaptive Query Execution (AQE) has a perf regression when using the default settings: if we coalesce the shuffle partitions into one or few partitions, we may leave many CPU cores idle and the perf is worse than AQE is off (which leverages all CPU cores).

We should try to avoid any perf regression in AQE with the default settings as possible as we can. This PR changes the default value of minPartitionNum when coalescing shuffle partitions, to be SparkContext.defaultParallelism, so that AQE can leverage all the CPU cores.

[API][3.0][SPARK-31146][SQL] Leverage the helper method for aliasing in built-in SQL expressions (+70, -55)>

This PR leverages the helper method for aliasing in built-in SQL expressions to use the alias as its output column name where it's applicable.

  • ExpressionUnaryMathExpression and BinaryMathExpression search the alias in the tags by default.
  • When the naming is different in its implementation, it has to be overwritten for the expression specifically. E.g., CallMethodViaReflectionRemainderCurrentTimestampFormatString and XPathDouble.

This PR fixes the automatically generated aliases of the functions below:

classalias
Randrandom
Ceilceiling
Remaindermod
Powpow
Signumsign
Chrchar
Lengthchar_length
Lengthcharacter_length
FormatStringprintf
Substringsubstr
Upperucase
XPathDoublexpath_number
DayOfMonthday
CurrentTimestampnow
Sizecardinality
Sha1sha
CallMethodViaReflectionjava_method

Note: EqualTo= and == aliases were excluded because it's unable to leverage this helper method. It should fix the parser.

Note: this PR also excludes some instances such as ToDegreesToRadiansUnaryMinus and UnaryPositive that needs an explicit name overwritten to make the scope of this PR smaller.

This might change the output schema of the queries. Users will see the new output column names, if they do not manually specify the aliases.

[API][3.0][SPARK-31150][SQL] Parsing seconds fraction with variable length for timestamp (+555, -270)>

This PR supports parsing timestamp values with variable length second fraction parts, like Spark 2.4.

e.g. 'yyyy-MM-dd HH:mm:ss.SSSSSS[zzz]' can parse timestamp with 0~6 digit-length second fraction but would return NULL when digit-length >=7

// for 0 ~ 6
select to_timestamp(v, 'yyyy-MM-dd HH:mm:ss.SSSSSS[zzz]') from values
 ('2019-10-06 10:11:12.'),
 ('2019-10-06 10:11:12.0'),
 ('2019-10-06 10:11:12.1'),
 ('2019-10-06 10:11:12.12'),
 ('2019-10-06 10:11:12.123UTC'),
 ('2019-10-06 10:11:12.1234'),
 ('2019-10-06 10:11:12.12345CST'),
 ('2019-10-06 10:11:12.123456PST') t(v)

2019-10-06 03:11:12.123
2019-10-06 08:11:12.12345
2019-10-06 10:11:12
2019-10-06 10:11:12
2019-10-06 10:11:12.1
2019-10-06 10:11:12.12
2019-10-06 10:11:12.1234
2019-10-06 10:11:12.123456

// for >= 7
select to_timestamp('2019-10-06 10:11:12.1234567PST', 'yyyy-MM-dd HH:mm:ss.SSSSSS[zzz]')
NULL

[API][3.0][SPARK-31171][SQL] size(null) should return null under ansi mode (+24, -3)>

The PR#27834 changes the result of size(null) to be -1 to match the Spark 2.4 behavior and avoid the breaking changes. However, the "return -1" behavior is error-prone when being used with aggregate functions.

The current ANSI mode controls a bunch of "better behaviors" like failing on overflow. We don't enable these "better behaviors" by default because they break the previous behaviors. The "return null" behavior of size(null) is a good fit of the ANSI mode. This PR makes size(null) return null under ANSI mode, regardless of the spark.sql.legacy.sizeOfNull config.

[API][3.1][SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector (+663, -58)>

When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of the Kerberos ticket or an ability to generate it.

This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in an enterprise environment where exposing simple authentication access is not an option due to IT policy issues.

This PR added the Postgres support (other supported databases will come in later PRs). The newly added JDBC options:

  • keytab : Location of the kerberos keytab file (which must be pre-uploaded to all nodes either by --files option of spark-submit or manually) for the JDBC client. When path information found then Spark considers the keytab distributed manually, otherwise --files assumed. If both keytab and principal are defined, Spark tries to do kerberos authentication.
  • principal : Specifies kerberos principal name for the JDBC client. If both keytab and principal are defined, Spark tries to do kerberos authentication.

[3.1][SPARK-31071][SQL] Allow annotating non-null fields when encoding Java Beans (+136, -5)>

When encoding Java Beans to Spark DataFrame, non-primitive types are encoded as nullable fields by default. Although it works for most cases, it can be still an issue. For example, when saving a DataFrame using an Avro format with non-spark generated Avro schema with non-null field, Spark would assume that the field is nullable. However, this assumption conflicts with Avro schema semantics and an exception is thrown.

This PR proposes to respect javax.annotation.Nonnull and produce the non-null fields when encoding Java Beans to Spark DataFrame.

[3.0][SPARK-31090][SPARK-25457] Revert "IntegralDivide returns data type of the operands" (+110, -263)>

There is no standard requiring that div must return the type of the operand, and always returning long type looks fine. This is kind of a cosmetic change and we should avoid it if it breaks the existing queries. Thus, the original commit got reverted.

[3.0]Revert "[SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir" (+42, -55)>

This PR reverts commit 5bc0d76.

[API][3.0][SPARK-25121][SQL] Supports multi-part relation names for join strategy hint resolution (+230, -27)>

This PR adds support for multi-part relation names specified in join strategy hints. Before this PR, the database name in a multi-part relation name in SQL hints was ignored.

[API][3.0][SPARK-30127][SQL] Support case class parameter for typed Scala UDF (+519, -410)>

This PR adds Scala case class to the allowed data types for typed Scala UDFs. For example, users can now write:

case class TestData(key: Int, value: String)
val f = (d: TestData) => d.key * d.value.toInt
val myUdf = udf(f)
val df = Seq(("data", TestData(50, "2"))).toDF("col1", "col2")
checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)

[API][3.0][SPARK-30292][SQL][FOLLOWUP] ansi cast from strings to integral numbers (byte/short/int/long) should fail with fraction (+16, -10)>

This PR fails ANSI cast from floating-point number strings (e.g., "1.23") to integral types. This is a follow-up for #26933

[2.4][SPARK-30494][SQL] Fix cached data leakage during replacing an existing view (+68, -12)>

This PR calls "uncache" on views that have been replaced to make sure the memory is released for an obsolete view.

[API][3.0][SPARK-31119][SQL] Add interval value support for extract expression as extract source (+484, -449)>

This PR adds support for INTERVAL values as extract source in extract expression, in addition to DATETIME values. Now the extract grammar is:

<extract expression> ::= EXTRACT <left paren> <extract field> FROM <extract source> <right paren>

<extract source> ::= <datetime value expression> | <interval value expression>

[API][3.0][SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet (+343, -17)>

The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing date and timestamp values via Parquet datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for date/timestamp values before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to:

  • -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into parquet.
  • -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.

This PR adds a SQL config for the option of enabling the backward compatible behavior, which rebases from/to Proleptic Gregorian calendar to the hybrid one:

spark.sql.legacy.parquet.rebaseDateTime.enabled

The default config value is false.

[API][3.0][SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro (+159, -15)>

The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing date and timestamp values via Avro datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for date/timestamp values before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to:

  • -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into Avro files.
  • -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.

This PR adds a SQL config for the option of enabling the backward compatible behavior, which rebases from/to Proleptic Gregorian calendar to the hybrid one:

spark.sql.legacy.avro.rebaseDateTime.enabled

The default config value is false.

[API][3.0][SPARK-31176][SQL] Remove support for 'e'/'c' as datetime pattern character (+141, -57)>

This PR removes support of 'e'/'c' as DATETIME pattern character to avoid ambiguity. Besides, it also fixes a bug in convertIncompatiblePattern in which ' would be lost if used as the last character.

[3.0][SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times (+112, -25)>

This PR extends V2CommandExec for all the data writing commands so that they only get executed once over multiple collect() calls.

[3.0][SPARK-31190][SQL] ScalaReflection should not erasure user defined AnyVal type (+39, -15)>

Improve ScalaReflection to avoid erasure of non-user-defined AnyVal types only, but not for other types, e.g. Any.

[API][3.0][SPARK-31205][SQL] Support string literal as the second argument of date_add/date_sub functions (+118, -9)>

This PR adds support for a String literal value as the second parameter of functions date_add and date_sub by enclosing the string parameter in an ansi_cast function and failing the query at compile-time in case of invalid string values.

[API][3.0][SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap years (+77, -19)>

This PR fixes the issue of rebasing leap years in Julian calendar to Proleptic Gregorian calendar in which the years are not leap years. For example, before this PR, a java.time.DateTimeException would be raised while loading the date 1000-02-29 from parquet files saved by Spark 2.4.5; after this PR, the date can be resolved as 1000-03-01 when turning on spark.sql.legacy.parquet.rebaseDateTime.enabled.

[API][3.0][SPARK-31221][SQL] Rebase any date-times in conversions to/from Java types (+109, -100)>

This PR applies rebasing for all dates/timestamps in conversion functions fromJavaDate()toJavaDate()toJavaTimestamp() and fromJavaTimestamp(). The rebasing is performed via building a local date-time in an original calendar, extracting date-time fields from the result, and creating new local date-time in the target calendar.

[3.1][SPARK-31184][SQL] Support getTablesByType API of Hive Client (+92, -16)>

This PR adds getTablesByType in HiveShim. For those Hive versions that do not support this API, UnsupportedOperationException will be thrown, and the upper logic would catch the exception and fallback to the "getTables + getTablesByName + filter with type" solution.

SS

[3.0][SPARK-31126][SS] Upgrade Kafka to 2.4.1 (+1, -1)>

Upgrade Kafka library from 2.4.0 to 2.4.1 to fix a client-side bug KAFKA-8933. See the full release notes.

PYTHON

[API][3.1][SPARK-30569][SQL][PYSPARK][SPARKR] Add percentile_approx DSL functions (+176, -1)>

Currently, we support percentile_approx only in SQL expression. This PR is to add the percentile_approx DSL functions

  • Adds following overloaded variants to Scala o.a.s.sql.functions:

    • percentile_approx(e: Column, percentage: Array[Double], accuracy: Long): Column
    • percentile_approx(columnName: String, percentage: Array[Double], accuracy: Long): Column
    • percentile_approx(e: Column, percentage: Double, accuracy: Long): Column
    • percentile_approx(columnName: String, percentage: Double, accuracy: Long): Column
    • percentile_approx(e: Column, percentage: Seq[Double], accuracy: Long): Column (primarily for Python interop).
    • percentile_approx(columnName: String, percentage: Seq[Double], accuracy: Long): Column
  • Adds percentile_approx to pyspark.sql.functions.

  • Adds percentile_approx function to SparkR.

UI

[3.1][SPARK-30654][WEBUI] Bootstrap4 WebUI upgrade (+647, -1666)>

Spark Web UI is using an older version of Bootstrap (v. 2.3.2) for the portal pages. Bootstrap 2.x was moved to EOL in Aug 2013 and Bootstrap 3.x was moved to EOL in July 2019 (https://github.com/twbs/release). Older versions of Bootstrap are also getting flagged in security scans for various CVEs:

This PR upgrades it to Bootstrap V4 to resolve any potential issue and get on a supported release. This PR is manually tested.

[3.0][SPARK-31081][UI][SQL] Make display of stageId/stageAttemptId/taskId of sql metrics toggleable (+83, -7)>

This PR adds a checkbox which can toggle display of stageId/taskid that corresponds to the max metric in the SQL's DAG page.

ML

[3.1][SPARK-31032][ML] GMM compute summary and update distributions in one job (+44, -73)>

[3.1][SPARK-31077][ML] Remove ChiSqSelector dependency on mllib.ChiSqSelectorModel (+124, -43)>

[API][3.0][SPARK-30773][ML] Support NativeBlas for level-1 routines (+49, -19)>

This PR provides a way to allow user take advantage of NativeBLAS for level-1 routines.

[API][3.1][SPARK-31138][ML] Add ANOVA Selector for continuous features and categorical labels (+860, -6)>

This PR adds ANOVASelector for continuous features and categorical labels.

[API][3.1][SPARK-31185][ML] Implement VarianceThresholdSelector (+411, -0)>

Implement a Feature selector that removes all low-variance features. Features with a variance lower than the threshold will be removed. The default is to keep all features with non-zero variance, i.e. remove the features that have the same value in all samples.

OTHER

[2.4][SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final (+4, -4)>

[3.1][SPARK-25355][K8S] Add proxy user to driver if present on spark-submit (+89, -11)>

[3.1][SPARK-31125][K8S] Terminating pods have a deletion timestamp but they are not yet dead (+12, -1)>

[3.1][SPARK-31120][BUILD] Support enabling maven profiles for importing via sbt on Intellij IDEA (+2, -1)>

[2.4][SPARK-31101][BUILD] Upgrade Janino to 3.0.16 (+7, -7)>