[OSS DIGEST] The major changes of Apache Spark from Apr 8 to Apr 21

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[OSS DIGEST] The major changes of Apache Spark from Apr 8 to Apr 21

Hyukjin Kwon
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.


[SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling (+506, -154)>

Delay scheduling is an optimization that sacrifices job fairness for data locality in order to improve cluster and workload throughput. One useful definition of "delay" is how much time has passed since the TaskSet was using its fair share of resources. Since it is impractical to calculate this delay without a full simulation, the heuristic used is the time since the TaskSetManager last launched a task and has not rejected any resources due to delay scheduling since it was last offered its "fair share". A "fair share" offer is when resourceOffers's parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource is not utilized despite there being pending tasks (implemented inside TaskSetManager). The legacy heuristic only measured the time since the TaskSetManager last launched a task, and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true.

  • spark.locality.wait.legacyResetOnTaskLaunch [Default: false]

    Whether to use the legacy behavior of locality wait, which resets the delay timer anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class documentation for more details.

[SPARK-31018][CORE][DOCS] Deprecate support of multiple workers on the same host in Standalone (+7, -5)>

SPARK_WORKER_INSTANCES is deprecated in Standalone mode because there looks no scenario that necessarily requires deploying multiple Workers on the same node with Standalone backend. To launch multiple executors per node, it is recommended to launch one worker per node and multiple executors per worker, instead of launching multiple workers per node and one/multiple executors per worker.

[SPARK-31425][SQL][CORE] UnsafeKVExternalSorter/VariableLengthRowBasedKeyValueBatch should also respect UnsafeAlignedOffset (+78, -56)>

Previously, Spark did not respect UnsafeAlignedOffset at UnsafeKVExternalSorter and VariableLengthRowBasedKeyValueBatch which can be platform-dependent. It could lead to the potential data corruption in the SPARC platform. This PR is to fix it.

[SPARK-31472][CORE] Make sure Barrier Task always return messages or exception with abortableRpcFuture check (+32, -37)>

Previously, barrier task could return a null message or execute successfully when it should throw an Exception. After the fix it guarantees to throw a proper Exception or return a message.


[SPARK-31359][SQL] Speed up timestamps rebasing (+5588, -438)>

This PR proposes to optimize the DateTimeUtils.rebaseJulianToGregorianMicros() and rebaseGregorianToJulianMicros() functions, and make them faster by using pre-calculated rebasing tables. This approach allows to avoid expensive conversions via local timestamps. For example, the America/Los_Angeles time zone has just a few time points when difference between Proleptic Gregorian calendar and the hybrid calendar (Julian + Gregorian since 1582-10-15) is changed in the time interval 0001-01-01 .. 2100-01-01:

ilocal timestampProleptic Greg. secondsHybrid (Julian+Greg) secondsdifference in minutes
00001-01-01 00:00-62135568422-62135740800-2872
10100-03-01 00:00-59006333222-59006419200-1432
131582-10-15 00:00-12219264422-122192640007
141883-11-18 12:00-2717640000-27176400000

[SPARK-31398][SQL] Fix perf regression of loading dates before 1582 year by non-vectorized ORC reader (+96, -95)>

In regular ORC reader when spark.sql.orc.enableVectorizedReader is set to false, I propose to use DaysWritable in reading DATE values from ORC files. Currently, days from ORC files are converted to java.sql.Date, and then to days in Proleptic Gregorian calendar. The unnecessary conversion to java.sql.Date can be eliminated.

[SPARK-31402][SQL] Fix rebasing of BCE dates/timestamps (+133, -51)>

This PR proposes to fallback to rebasing via local dates/timestamps for days/micros of before common era (BCE).

[SPARK-31414][SQL] Fix performance regression with new TimestampFormatter for json and csv time parsing (+261, -224)>

For Json and CSV sources, for the invalid timestamp values, the non-legacy date-time parser will call the new parser and then fallback to legacy parser of Spark 2.4. To avoid the extra overhead, this PR modifies the timestamp pattern to ....HH:mm:ss[.SSS][XXX] which makes most of the timestamp strings valid for the new parser to prohibit fallback.

[SPARK-31426][SQL] Fix perf regressions of toJavaTimestamp/fromJavaTimestamp (+316, -321)>

This PR speeds up conversions to/from java.sql.Timestamp by using the rebaseGregorianToJulianMicros() and rebaseJulianToGregorianMicros() functions introduced by the PR SPARK-31359 #28119 in DateTimeUtils.toJavaTimestamp() and fromJavaTimestamp(). This PR also improve the performance of ORC datasource in loading/saving timestamps. However, compared with Spark 2.4, 5% regression in DF loading and 11% regression in DF saving still exist.

[SPARK-31439][SQL] Fix perf regression of fromJavaDate (+232, -231)>

This PR proposes to re-use optimized implementation of days rebase function rebaseJulianToGregorianDays() introduced by the PR SPARK-31297 #28067 in conversion of java.sql.Date values to Catalyst's DATE values. The function fromJavaDate in DateTimeUtils was re-written by taking the implementation from Spark 2.4, and by rebasing the final results via rebaseJulianToGregorianDays(). The performance of converting external values to Catalyst's DATE values is 4 times faster than the existing master and 30% faster than Spark 2.4.6.

[SPARK-31416][SQL] Check more strictly that a field name can be used as a valid Java identifier for codegen (+32, -22)>

This PR is to use javax.lang.model.SourceVersion rather than the hardcoded reserved keywords for checking whether a field name can be used as a valid Java identifier in ScalaReflection.serializerFor (CODEGEN).

[SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType (+27, -16)>

After this PR, Spark can infer the schema for the records containing CalendarInterval. Below is an example:

Seq((1, new CalendarInterval(1, 2, 3))).toDF("a", "b")

[SPARK-25154][SQL] Support NOT IN sub-queries inside nested OR conditions (+580, -44)>

This PR is to support the NOT IN subqueries (predicated null aware subquery) inside the disjunctive predicates.


[SPARK-31008][SQL] Support json_array_length function (+192, -1)>

This PR is to add the new function json_array_length(jsonArray), which returns the number of elements in the outmost JSON array.


  • jsonArray - A JSON array. NULL is returned in case of any other valid JSON string, NULL or an invalid JSON.


      > SELECT json_array_length('[1,2,3,4]');
      > SELECT json_array_length('[1,2,3,{"f1":1,"f2":[5,6]},4]');
      > SELECT json_array_length('[1,2');

[SPARK-31009][SQL] Support json_object_keys function (+223, -1)>

This PR is to add the new function json_object_keys(json_object), which returns all the keys of the outmost JSON object as an array.


  • json_object - A JSON object. If a valid JSON object is given, all the keys of the outmost object will be returned as an array. If it is any other valid JSON string, an invalid JSON string or an empty string, the function returns null.


      > Select json_object_keys('{}');
      > Select json_object_keys('{"key": "value"}');
      > Select json_object_keys('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}');

[SPARK-31021][SQL] Support MariaDB Kerberos login in JDBC connector (+457, -189)>

When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This PR is to support MariaDB Kerberos login in the generic JDBC connector. Users can set the location of the kerberos keytab files and the kerberos principal name via the JDBC option keytab and principal.

[SPARK-31381][SPARK-29245][SQL] Upgrade built-in Hive 2.3.6 to 2.3.7 (+41, -40)>

Upgrade Execution Hive JAR from 2.3.6 to 2.3.7.

[SPARK-31256][SQL] DataFrameNaFunctions.drop should work for nested columns (+35, -25)>

In Spark 2.4.5, DataFrameNaFunctions.drop mistakenly dropped the nested column support and generated an incorrect result. After this fix, it works as expected.


scala> spark.range(1).selectExpr("struct(null, id) AS outer").na.drop("any", "outer.col1" :: Nil).show()
|[, 0]|


scala> spark.range(1).selectExpr("struct(null, id) AS outer").na.drop("any", "outer.col1" :: Nil).show()

[SPARK-30564][SQL] Revert Block.length and use comment placeholders in HashAggregateExec (+11, -11)>

SPARK-21870 caused a performance regression in the upcoming Spark 3.0 release. It used a regular expression to exclude new lines and comments and extract the actual codes. When the source code size is large, this regular expression can be very slow, in particular, for deeply nested complex types. This PR fixes the performance regression by not using the regular expressions.

[SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata (+299, -105)>

This PR adds a new parquet/avro file metadata: org.apache.spark.legacyDatetime. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark needs to do rebase when reading it.

This makes Spark be able to do rebase more smartly:

  1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
  2. If the file was written by Spark 2.4 and earlier, then do rebase.
  3. If the file was written by Spark 3.0 and later, do rebase if the org.apache.spark.legacyDatetime exists in file metadata.

[SPARK-31385][SQL] Restrict micros rebasing via switch arrays up to 2037 year (+35, -9)>

Java's sun.util.calendar.ZoneInfo does not correctly resolve DST after 2037 year correctly. Previously several time zones were not able to pre-calculate because the results were not matched after 2037 years, and it could slow down the rebasing optimization. After restricting to 2037 as the upper bound in the pre-calculated array for rebasing, now we can also optimize the cases of Asia/TehranIranAfrica/Casablanca and Africa/El_Aaiun time zones.

[SPARK-31423][SQL] Fix rebasing of not-existed dates (+39, -7)>

There are non-existent dates in the hybrid Julian-Gregorian calendar, from 1582-10-04 to 1582-10-15. Such non-existent dates are shifted to 1582-10-15 when it rebases from Proleptic Gregorian calendar to hybrid Julian-Gregorian calendar.

[SPARK-31455][SQL] Fix rebasing of not-existed timestamps (+1244, -1205)>

Similar to SPARK-31423. There are non-existent timestamps in the hybrid Julian-Gregorian calendar, from 1582-10-04 to 1582-10-15. The non-existent dates are shifted to 1582-10-15 while keeping time fields unchanged when they are rebased from Proleptic Gregorian calendar to hybrid Julian-Gregorian calendar.

[SPARK-31443][SQL] Fix perf regression of toJavaDate (+236, -222)>

Enables the optimization of using pre-calculated array in toJavaDate to boost the performance.

[SPARK-31450][SQL] Make ExpressionEncoder thread-safe (+282, -238)>

Previously, ExpressionEncoder was not thread-safe. Now, ExpressionEncoder is thread-safe, stateless, more reusable.

[SPARK-31468][SQL] Null types should be implicitly casted to Decimal types (+33, -4)>

In Spark 2.4, the null comparison against decimals is supported. This PR is to fix the regression in the unreleased Spark 3.0 branch.


scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").show()
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type
mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0;


scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").show()
|(v1 = NULL)|
|       null|

[SPARK-31469][SQL] Make extract interval field ANSI compliance (+120, -540)>

Removes the non-ANSI standard abbreviations with intervals in extract and date_part. It removes millenniumcenturydecadequartermillisecondmicroseconds and epoch with intervals. These were added and removed back in the unreleased Spark 3.0 branch only.

[SPARK-31474][SQL] Consistency between dayofweek/dow in extract exprsession and dayofweek function (+47, -41)>

Changes dow in extract ranges from 1 to 7 to match with dayofweek. This is to make the unreleased feature consistent with the released function dayofweek/weekday.


scala> spark.range(1).selectExpr("extract(dow from '2009-07-26')").show()
|extract('dow' FROM '2009-07-26')|
|                               0|


scala> spark.range(1).selectExpr("extract(dow from '2009-07-26')").show()
|extract('dow' FROM '2009-07-26')|
|                               1|


[SPARK-30818][SPARKR][ML] Add SparkR LinearRegression wrapper (+407, -1)>

Adds the SparkR implementation for LinearRegression

[SPARK-30819][SPARKR][ML] Add FMRegressor wrapper to SparkR (+438, -1)>

Adds the SparkR implementation for FMRegressor

[SPARK-31301][ML] Flatten the result dataframe of tests in testChiSquare (+104, -92)>

1, Remove newly added method: def testChiSquare(dataset: Dataset[_], featuresCol: String, labelCol: String): Array[SelectionTestResult], because: 1) it is only used in ChiSqSelector;

2, Since the returned dataframe of def test(dataset: DataFrame, featuresCol: String, labelCol: String): DataFrame only contains one row, after collect it back to driver, the results are similar;

3, Add method def test(dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame to return the flatten results;

[SPARK-31436][ML] MinHash keyDistance optimization (+34, -7)>

Optimize MinHashLSH.keyDistance from set-based implementation to naive implementation with simple loops. It boosted from 5 ~ 9 times faster.


[SPARK-31278][SS] Fix StreamingQuery output rows metric (+73, -39)>

In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.

[SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run (+7, -7)>

This PR fixes the behavior of ProgressReporter which always overwrites the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.)

It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch.

[SPARK-31464][BUILD][SS] Upgrade Kafka to 2.5.0 (+5, -5)>

Upgrade Kafka from 2.4.1 to 2.5.0


[SPARK-31441] Support duplicated column names for toPandas with arrow execution (+26, -7)>

This PR is to support duplicated column names for toPandas with Arrow execution.


[SPARK-31411][UI] Show submitted time and duration in job details page (+49, -8)>

Show submitted time and duration of a job in its details page


[SPARK-31420][WEBUI] Infinite timeline redraw in job details page (+140, -114)>

This PR is to upgrade vis.js to fix an infinite re-drawing issue in Job details page.


[SPARK-29905][K8S] Improve pod lifecycle manager behavior with dynamic allocation (+188, -79)>

When dynamic allocation is enabled, the lifecycle manager could show the log from the same executor multiple times, potentially leading to the multiple redundant calls into the API server for the same pod. Even, the finished executors can be processed when the executor pods are configured not to delete.

This PR fixes it by

  • logging executor update once by checking whether the state change happened before
  • marking and filtering the finished-but-not-deleted-in-k8s executors.

[SPARK-31394][K8S] Adds support for Kubernetes NFS volume mounts (+121, -0)>

Adds the support of mounting NFS volumes. Users can now mount NFS volumes by running commands like:

spark-submit \
--conf spark.kubernetes.driver.volumes.nfs.myshare.mount.path=/myshare \
--conf spark.kubernetes.driver.volumes.nfs.myshare.mount.readOnly=false \
--conf spark.kubernetes.driver.volumes.nfs.myshare.options.server=nfs.example.com \
--conf spark.kubernetes.driver.volumes.nfs.myshare.options.path=/storage/myshare \


[SPARK-31382][BUILD] Show a better error message for different python and pip installation mistake (+16, -2)>

This PR proposes to show a better error message when a user mistakenly installs pyspark from PIP but the default python does not point out the corresponding pip. See https://stackoverflow.com/questions/46286436/running-pyspark-after-pip-install-pyspark/49587560 as an example. The typical error message is like:

Could not find valid SPARK_HOME while searching ['/home/user', '/home/user/.local/bin']

After the change, the error message is like

Could not find valid SPARK_HOME while searching ['/home/user', '/home/user/.local/bin']

Did you install PySpark via a package manager such as pip or Conda? If so,
PySpark was not found in your Python environment. It is possible your
Python environment does not properly bind with your package manager.

Please check your default 'python' and if you set PYSPARK_PYTHON and/or
PYSPARK_DRIVER_PYTHON environment variables, and see if you can import
PySpark, for example, 'python -c 'import pyspark'.

If you cannot import, you can install by using the Python executable directly,
for example, 'python -m pip install pyspark [--user]'. Otherwise, you can also
explicitly set the Python executable, that has PySpark installed, to
PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON environment variables, for example,
'PYSPARK_PYTHON=python3 pyspark'.

[SPARK-31330] Automatically label PRs based on the paths they touch (+131, -0)>

This PR adds some rules that will be used by Probot Auto Labeler to label PRs based on what paths they modify.