Re: [OSS DIGEST] The major changes of Apache Spark from May 20 to June 2

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: [OSS DIGEST] The major changes of Apache Spark from May 20 to June 2

Hyukjin Kwon
Wu is currently having an issue about the access in the dev mailing list access for an unexpected issue, and the previous email was malformed for that reason.
I am resending this as it looks going to take a while to fix the issue. Please ignore the previous email and read this.

Thanks.


CORE

[API][3.1][SPARK-8981][CORE] Add MDC support in Executor (+27, -1)>

This PR added MDC(Mapped Diagnostic Context) support for task threads. By default, each log line printed by the same task thread will include the same unique task name. Besides, user can also add the custom content to logs by configuring the log4j pattern. For example, application IDs/names. This is important when the clusters is shared by different users/applications.

Before:

scala> testDf.collect()
...
20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_1 in memory.
20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache broadcast_1 in memory! (computed 384.0 B so far)
20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_0 in memory.
20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache broadcast_0 in memory! (computed 384.0 B so far)
20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes).
20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes).
20/04/28 16:41:58 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144 bytes of memory, got 22200
...

After(please note the end of each line):

scala> testDf.collect()
...
20/04/28 16:40:59 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_1 in memory [task 1.0 in stage 0.0].
20/04/28 16:40:59 WARN MemoryStore: Not enough space to cache broadcast_1 in memory! (computed 384.0 B so far) [task 1.0 in stage 0.0]
20/04/28 16:40:59 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_0 in memory. [task 1.0 in stage 0.0]
20/04/28 16:40:59 WARN MemoryStore: Not enough space to cache broadcast_0 in memory! (computed 384.0 B so far) [task 1.0 in stage 0.0] 
20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. [task 0.0 in stage 0.0]
20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. [task 1.0 in stage 0.0]
20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 0.0 in stage 0.0]
20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 1.0 in stage 0.0] 
20/04/28 16:41:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144 bytes of memory, got 22200 [task 0.0 in stage 0.0]
...

[API][3.1][SPARK-29150][CORE] Update RDD API for Stage level scheduling to be public (+29, -25)>

This PR makes the access level of the RDD api for stage level scheduling public.

SQL

[API][3.0][SPARK-31750][SQL] Eliminate UpCast if child's dataType is DecimalType (+52, -8)>

Eliminate the UpCast that are implicitly added by Spark, if its child data type is already the decimal type. Otherwise, for cases like:

sql("select cast(1 as decimal(38, 0)) as d")
  .write.mode("overwrite")
  .parquet(f.getAbsolutePath)

spark.read.parquet(f.getAbsolutePath).as[BigDecimal]

could fail as follow:

[info]   org.apache.spark.sql.AnalysisException: Cannot up cast `d` from decimal(38,0) to decimal(38,18).
[info] The type path of the target object is:
[info] - root class: "scala.math.BigDecimal"
[info] You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3060)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3087)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3071)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)

[API][3.0][SPARK-31755][SQL] Allow missing year/hour when parsing date/timestamp string (+370, -92)>

In order to keep backward compatibility with Spark 2.4, this PR allows missing year and hour fields when parsing date/timestamp string and uses the year 1970 and the hour 0 as the default values.

In Spark 3.0,

Before:

scala> sql("select to_timestamp('16', 'dd')").show
+--------------------+
|to_timestamp(16, dd)|
+--------------------+
|                null|
+--------------------+


scala> sql("select to_date('16', 'dd')").show
+---------------+
|to_date(16, dd)|
+---------------+
|           null|
+---------------+

scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show
+------------------------------+
|to_timestamp(2019 40, yyyy mm)|
+------------------------------+
|           2019-01-01 00:00:00|
+------------------------------+


scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show
+------------------------------------------+
|to_timestamp(2019 10:10:10, yyyy hh:mm:ss)|
+------------------------------------------+
|                       2019-01-01 00:00:00|
+------------------------------------------+

After:

scala> sql("select to_timestamp('16', 'dd')").show
+------------------------+
|to_timestamp('16', 'dd')|
+------------------------+
|     1970-01-16 00:00:00|
+------------------------+


scala> sql("select to_date('16', 'dd')").show
+-------------------+
|to_date('16', 'dd')|
+-------------------+
|         1970-01-16|
+-------------------+


scala> sql("select to_timestamp('2019 40', 'yyyy mm')").show
+----------------------------------+
|to_timestamp('2019 40', 'yyyy mm')|
+----------------------------------+
|               2019-01-01 00:40:00|
+----------------------------------+


scala> sql("select to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')").show
+----------------------------------------------+
|to_timestamp('2019 10:10:10', 'yyyy hh:mm:ss')|
+----------------------------------------------+
|                           2019-01-01 10:10:10|
+----------------------------------------------+

[3.0][SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString (+138, -52)>

This PR avoids both unnecessary overhead of converting Java date-time types to micros/days before formatting and unnecessary conversion from input micros/days to Java types for the formatters.

[API][3.0][SPARK-31771][SQL] Disable Narrow TextStyle for datetime pattern 'G/M/L/E/u/Q/q' (+1441, -50)>

In Spark 3.0, 5 continuous pattern characters with 'G/M/L/E/u/Q/q' is Narrow-Text Style while it's Full-Text Style in Spark 2.4. With Narrow-Text Style, Spark will only output the leading single letter of the value, e.g. December would be D. This PR disables Narrow-Text Style for these pattern characters in order to avoid the silent data change.

After this PR, queries with DateTime operations using DateTime patterns, e.g. G/M/L/E/u, will fail if the pattern length is 5. But for other patterns, e,g. 'k', 'm', they can still accept a certain number of letters.

As a result, using DateTime patterns like "GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "aa", "aaa", which are not supported by the new parser but the legacy parser, will hit an SparkUpgradeException. To bypass the exception, users can switch to the legacy parser or change to the new DateTime patterns. However, using DateTime patterns like "QQQQQ", "qqqqq", which are not supported by both the new parser and the legacy parser, will hit an IllegalArgumentException. This exception will be swallowed by Spark and the value null will be returned.

[API][3.0][SPARK-31808][SQL] Makes struct function's output name and class name pretty (+40, -16)>

This PR corrects struct's alias name and class name in ExpressionInfo.

Before:

scala> sql("DESC FUNCTION struct").show(false)
+------------------------------------------------------------------------------------+
|function_desc                                                                       |
+------------------------------------------------------------------------------------+
|Function: struct                                                                    |
|Class: org.apache.spark.sql.catalyst.expressions.NamedStruct                        |
|Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|
+------------------------------------------------------------------------------------+

scala> sql("SELECT struct(1, 2)").show(false)
+------------------------------+
|named_struct(col1, 1, col2, 2)|
+------------------------------+
|[1, 2]                        |
+------------------------------+

After:

scala> sql("DESC FUNCTION struct").show(false)
+------------------------------------------------------------------------------------+
|function_desc                                                                       |
+------------------------------------------------------------------------------------+
|Function: struct                                                                    |
|Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct                  |
|Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|
+------------------------------------------------------------------------------------+

scala> sql("SELECT struct(1, 2)").show(false)
+------------+
|struct(1, 2)|
+------------+
|[1, 2]      |
+------------+

[API][3.0][SPARK-31818][SQL] Fix pushing down filters with java.time.Instant values in ORC (+77, -48)>

When spark.sql.datetime.java8API.enabled=true, pushing down filters with java.time.Instant to ORC datasource can fail by:

java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf
 at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
 at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)

This PR fixes the error by converting java.time.Instant to java.sql.Timestamp in the filters.

[API][3.1][SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG function (+43, -5)>

This PR adds the built-in SQL function, CURRENT_CATALOG for DataSourceV2 only.

[API][3.1][SPARK-31673][SQL] QueryExection.debug.toFile() to take an additional explain mode param (+90, -34)>

Previously, QueryExecution.debug.toFile always uses the Extended mode to dump the query plan information. This PR allows users to specify the desired explain mode.

[API][3.1][SPARK-31710][SQL] Adds TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions (+237, -2)>

This PR adds the three built-in SQL functions [TIMESTAMP_SECONDSTIMESTAMP_MILLISTIMESTAMP_MICROS] in order to provide the convenient ways to create timestamps by interpreting the values as the number of seconds, milliseconds and microseconds since 1970-01-01 00:00:00 UTC respectively.

For example,

sql("select TIMESTAMP_SECONDS(t.a) as timestamp from values(1230219000),(-1230219000) as t(a)").show(false)
+-------------------------+
|timestamp                |
+-------------------------+
|2008-12-25 23:30:00      |
|1931-01-07 16:30:00      |
+-------------------------+

sql("select TIMESTAMP_MILLIS(t.a) as timestamp from values(1230219000123),(-1230219000123) as t(a)").show(false)
+-------------------------------+
|timestamp                      |
+-------------------------------+
|2008-12-25 23:30:00.123        |
|1931-01-07 16:29:59.877        |
+-------------------------------+


sql("select TIMESTAMP_MICROS(t.a) as timestamp from values(1230219000123123),(-1230219000123123) as t(a)").show(false)
+------------------------------------+
|timestamp                           |
+------------------------------------+
|2008-12-25 23:30:00.123123          |
|1931-01-07 16:29:59.876877          |
+------------------------------------+

[API][3.0][SPARK-31761][SQL] Cast integer to Long to avoid IntegerOverflow for IntegralDivide operator (+57, -12)>

This PR casts Byte/Short/Integer to Long for the left and right children of IntegralDivide to avoid overflow.

For cases like:

// the df is constructed from : (-2147483648, -1) --> (_c0, _c1)
val res = df.selectExpr("_c0 div _c1")
res.collect

Before:

res1: Array[org.apache.spark.sql.Row] = Array([-2147483648])

After:

res1: Array[org.apache.spark.sql.Row] = Array([2147483648])

[3.1][SPARK-31793][SQL] Reduce the memory usage in file scan location metadata (+61, -4)>

Currently, the operator of Data Source Scan stores all the paths in its metadata. The metadata is kept when a SparkPlan is converted into SparkPlanInfoSparkPlanInfo can be used to construct the Spark plan graph in UI. However, the paths can be still very large (e.g., when many partitions still remain after partition pruning), while UI pages only can show up to 100 bytes for the location metadata. Thus, this PR reduces the number of paths stored in metadata to reduce memory usage.

[3.0][SPARK-31354] SparkContext should only register one SparkSession ApplicationEnd listener (+41, -11)>

This PR makes sure that getOrCreate only registers Spark listener once. For example,

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

Before this PR, there are two listeners registered at ListenerBus:

[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb, <- first listener
org.apache.spark.sql.SparkSession$$anon$1fadb9a0]  <- second listener

After this PR, there's only one listener registered at ListenerBus:

[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb] <- only one listener

[API][2.4][SPARK-31854][SQL] Invoke in MapElementsExec should not propagate null (+12, -2)>

This PR fixes a NullPointerException caused by Dataset.map when whole-stage codegen is enabled by setting propagateNull to false when initializing Invoke.

[3.0][SPARK-31827][SQL] fail datetime parsing/formatting if detect the Java 8 bug of stand-alone form (+19, -4)>

This PR detects the usage of the LLL/qqq datetime pattern string under JDK8 and throws an exception with a clear error message, to avoid hitting a JDK8 bug: https://bugs.openjdk.java.net/browse/JDK-8114833.

[3.0][SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10 (+99, -10)>

This PR fails usage of datetime pattern in the form of y..y and Y..Y with lengths greater than 10 to avoid hitting a JDK bug.

[3.0][SPARK-31874][SQL] Use FastDateFormat as the legacy fractional formatter (+6, -1)>

This PR:

  1. replaces SimpleDateFormat by FastDateFormat as the legacy formatter of FractionTimestampFormatter, to utilize the internal cache of FastDateFormat, and avoid parsing the default pattern yyyy-MM-dd HH:mm:ss.
  2. optimizes LegacyFastTimestampFormatter for java.sql.Timestamp w/o fractional part, to avoid conversions to microseconds for patterns without the fractional part.

[3.0][SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet (+25, -22)>

This PR fixes conversions of java.sql.Timestamp to milliseconds in ParquetFilter by using existing functions from DateTimeUtils fromJavaTimestamp() and microsToMillis().

[3.0][API][SPARK-31888][SQL] Support java.time.Instant in Parquet filter pushdown (+83, -70)>

This PR enables push down of filters with java.time.Instant attributes by:

  1. modifying ParquetFilters.valueCanMakeFilterOn() to accept filters with java.time.Instant attributes.
  2. adding ParquetFilters.timestampToMicros() to support both types java.sql.Timestamp and java.time.Instant in conversions to microseconds.
  3. reusing timestampToMicros in constructing of Parquet filters.

[3.1][API][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow (+211, -25)>

This PR fixes wrong results in sum aggregate function with decimals in case of overflow by adding an extra flag field in the sum agg function.

[3.1][SPARK-28481][SQL] More expressions should extend NullIntolerant (+180, -103)>

This PR makes more expressions extend NullIntolerant, and as a result can avoid skew join if the join column has many null values and improve query performance.

[3.0][SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues (+209, -185)>

This PR fixes:

  1. SPARK-31861 "Thriftserver collecting timestamp not using spark.sql.session.timeZone" by converting the Timestamp values to String earlier, in SparkExecuteStatementOperation, using HiveResults.toHiveString().
  2. SPARK-31859 "Thriftserver not working with spark.sql.datetime.java8API.enabled=true" by using HiveResults.toHiveString().

ML

[API][3.1][SPARK-31734][ML][PYSPARK] Add weight support in ClusteringEvaluator (+167, -67)>

This PR adds setWeightCol method to ClusteringEvaluator as BinaryClassificationEvaluatorRegressionEvaluatorMulticlassClassificationEvaluator do.

[API][3.1][SPARK-31768][ML] Add getMetrics in Evaluators (+905, -591)>

Currently, Evaluator.evaluate can only access to one metrics. This PR adds getMetrics method in all the Evaluators to allow users to get multiple metrics.

For example:

  val trainer = new LinearRegression
  val model = trainer.fit(dataset)
  val predictions = model.transform(dataset)

  val evaluator = new RegressionEvaluator()

  val metrics = evaluator.getMetrics(predictions)
  val rmse = metrics.rootMeanSquaredError
  val r2 = metrics.r2
  val mae = metrics.meanAbsoluteError
  val variance = metrics.explainedVariance

[3.1][SPARK-31803][ML] Make sure instance weight is not negative (+32, -16)>

This PR adds checks to make sure instance weight is not negative in the algorithms that support instance weight.

[3.1][SPARK-31840][ML] Add instance weight support in LogisticRegressionSummary (+134, -43)>

Add instance weight support in LogisticRegressionSummary to match its capabilities with those of LogisticRegression, MulticlassClassificationEvaluator and BinaryClassificationEvaluator.

SS

[API][3.0][SPARK-31706][SS] Add back the support of streaming update mode (+54, -20)>

In Spark 2.4, all built-in v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default(see SPARK-22911). To keep the backward compatibility, we added back the support of streaming UPDATE mode that was dropped in the unreleased 3.0 branch.

[API][3.0][SPARK-31792][SS][DOCS] Introduce the structured streaming UI in the Web UI doc (+28, -0)>

This PR adds the structured streaming UI introduction in the Web UI doc.

[3.1][SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID (+102, -10)>

When the compacted metadata log file becomes huge, writing outputs for the compact + 1 batch can be slow due to unnecessarily reading the compacted metadata log file. To get rid of the unnecessary reading, this PR adds getLatestBatchId() method in CompactibleFileStreamLog in the complement of getLatest(). The method doesn't read the content of the latest batch metadata log file, and can be applied to both FileStreamSource and FileStreamSink to avoid unnecessary latency when reading the log file.

PYTHON

[3.0][SPARK-31788][CORE][DSTREAM][PYTHON] Recover the support of union for different types of RDD and DStreams (+39, -5)>

This PR manually specifies the class for the input array being used in (SparkContext|StreamingContext).union. It fixes a regression introduced from SPARK-25737.

[3.0][SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic (+61, -17)>

This PR makes PySpark exception more Pythonic by hiding JVM stacktrace by default, when the JVM exceptions are the analyzer's exceptions, AnalysisException, ParseException, StreamingQueryException, QueryExecutionException, IllegalArgumentException, and PythonException [thrown by Python UDFs]. It can be enabled by turning on spark.sql.pyspark.jvmStacktrace.enabled configuration.

[3.1][SPARK-25351][SQL][PYTHON] Handle Pandas category type when converting from Python with Arrow (+52, -0)>

This PR adds support for Pandas category type while converting from python with Arrow enabled. The category column will be converted to whatever type the category elements are as is the case with Arrow disabled.

[API][3.1][SPARK-31763][PYSPARK] Add inputFiles method in PySpark DataFrame Class (+32, -0)>

This PR adds inputFiles() method to PySpark DataFrame to enable PySpark users to list all files constituting a DataFrame.

UI

[API][3.1][SPARK-29303][WEB UI] Add UI support for stage level scheduling (+657, -103)>

This PR adds the UI support for stage level scheduling and ResourceProfiles:

  • Add ResourceProfile Id to Stage page
  • Add ResourceProfile Id to Executor page
  • Add a section with ResourceProfile Id to Environment page

Also, the rest API for the environment page is updated to include the ResourceProfile information.

Screen Shot 2020-04-01 at 3 07 46 PM Screen Shot 2020-04-01 at 3 08 14 PM Screen Shot 2020-04-01 at 3 09 03 PM Screen Shot 2020-04-01 at 11 05 48 AM

[API][3.1][SPARK-31642] Add Pagination Support for Structured Streaming Page (+209, -91)>

This PR adds the pagination support for the structured streaming page in order to:

  • Help users to analyze structured streaming queries in a much better way
  • Improve the consistent of Spark Web UI across various pages
  • Prevent potential OOM

[3.0][SPARK-31882][WEBUI] DAG-viz is not rendered correctly with pagination (+31, -6)>

This PR fixes a DAG-viz bug, in which rendering fails with pagination when DAG-viz fetches link urls for each stage of a job from the stage table.

OTHER

[3.0][SPARK-31786][K8S][BUILD] Upgrade kubernetes-client to 4.9.2 (+14, -11)>

[API][3.1][SPARK-31759][DEPLOY] Support configurable max number of rotate logs for spark daemons (+11, -3)>

[3.1][SPARK-31214][BUILD] Upgrade Janino to 3.1.2 (+14, -12)>

This PR upgrades Janino to 3.1.2: http://janino-compiler.github.io/janino/changelog.html

[3.1][SPARK-31858][BUILD] Upgrade commons-io to 2.5 in Hadoop 3.2 profile (+2, -2)>

This PR upgrades commons-io from 2.4 to 2.5 for Apache Spark 3.1

[3.1][SPARK-31876][BUILD] Upgrade to Zstd 1.4.5 (+4, -4)>

This PR aims to upgrade to Zstd 1.4.5: https://github.com/facebook/zstd/releases/tag/v1.4.5



2020년 7월 7일 (화) 오후 4:47, wuyi <[hidden email]>님이 작성:
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the
title.

Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, an [API] tag is added in the
title.


CORE[API][8.0][SPARK-29150][CORE] Update RDD API for Stage level scheduling
to be public (+29, -25)>This PR makes the access level of the RDD api for
stage level scheduling public.[API][7.1][SPARK-8981][CORE] Add MDC support
in Executor (+27, -1)>This PR added MDC(Mapped Diagnostic Context) support
for task threads. By default, each log line printed by the same task thread
will include the same unique task name. Besides, user can also add the
custom content to logs by configuring the log4j pattern. For example,
application IDs/names. This is important when the clusters is shared by
different users/applications.Before:scala> testDf.collect()...20/04/28
16:41:58 WARN MemoryStore: Failed to reserve initial memory threshold of
1024.0 KB for computing block broadcast_1 in memory.20/04/28 16:41:58 WARN
MemoryStore: Not enough space to cache broadcast_1 in memory! (computed
384.0 B so far)20/04/28 16:41:58 WARN MemoryStore: Failed to reserve initial
memory threshold of 1024.0 KB for computing block broadcast_0 in
memory.20/04/28 16:41:58 WARN MemoryStore: Not enough space to cache
broadcast_0 in memory! (computed 384.0 B so far)20/04/28 16:41:58 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0.20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Calling
spill() on RowBasedKeyValueBatch. Will not spill but return 0.20/04/28
16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page (1048576
bytes).20/04/28 16:41:58 WARN RowBasedKeyValueBatch: Failed to allocate page
(1048576 bytes).20/04/28 16:41:58 ERROR Executor: Exception in task 1.0 in
stage 0.0 (TID 1)org.apache.spark.memory.SparkOutOfMemoryError: Unable to
acquire 262144 bytes of memory, got 22200...After(please note the end of
each line):scala> testDf.collect()...20/04/28 16:40:59 WARN MemoryStore:
Failed to reserve initial memory threshold of 1024.0 KB for computing block
broadcast_1 in memory [task 1.0 in stage 0.0].20/04/28 16:40:59 WARN
MemoryStore: Not enough space to cache broadcast_1 in memory! (computed
384.0 B so far) [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN MemoryStore:
Failed to reserve initial memory threshold of 1024.0 KB for computing block
broadcast_0 in memory. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN
MemoryStore: Not enough space to cache broadcast_0 in memory! (computed
384.0 B so far) [task 1.0 in stage 0.0] 20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0. [task 0.0 in stage 0.0]20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not
spill but return 0. [task 1.0 in stage 0.0]20/04/28 16:40:59 WARN
RowBasedKeyValueBatch: Failed to allocate page (1048576 bytes). [task 0.0 in
stage 0.0]20/04/28 16:40:59 WARN RowBasedKeyValueBatch: Failed to allocate
page (1048576 bytes). [task 1.0 in stage 0.0] 20/04/28 16:41:00 ERROR
Executor: Exception in task 0.0 in stage 0.0 (TID
0)org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144
bytes of memory, got 22200 [task 0.0 in stage
0.0]...SQL[API][7.0][SPARK-31750][SQL] Eliminate UpCast if child's dataType
is DecimalType (+52, -8)>Eliminate the UpCast that are implicitly added by
Spark, if its child data type is already the decimal type. Otherwise, for
cases like:sql("select cast(1 as decimal(38, 0)) as d")
.write.mode("overwrite")
.parquet(f.getAbsolutePath)spark.read.parquet(f.getAbsolutePath).as[BigDecimal]could
fail as follow:[info] org.apache.spark.sql.AnalysisException: Cannot up cast
`d` from decimal(38,0) to decimal(38,18).[info] The type path of the target
object is:[info] - root class: "scala.math.BigDecimal"[info] You can either
add an explicit cast to the input data or choose a higher precision type of
the field in the target object;[info] at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3060)[info]
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3087)[info]
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$33$$anonfun$applyOrElse$174.applyOrElse(Analyzer.scala:3071)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)[info]
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)[info]
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)[API][7.0][SPARK-31755][SQL]
Allow missing year/hour when parsing date/timestamp string (+370, -92)>In
order to keep backward compatibility with Spark 2.4, this PR allows missing
year and hour fields when parsing date/timestamp string and uses the
year 1970 and the hour 0 as the default values.In Spark 3.0,Before:scala>
sql("select to_timestamp('16',
'dd')").show+--------------------+|to_timestamp(16,
dd)|+--------------------+| null|+--------------------+scala> sql("select
to_date('16', 'dd')").show+---------------+|to_date(16,
dd)|+---------------+| null|+---------------+scala> sql("select
to_timestamp('2019 40', 'yyyy
mm')").show+------------------------------+|to_timestamp(2019 40, yyyy
mm)|+------------------------------+| 2019-01-01
00:00:00|+------------------------------+scala> sql("select
to_timestamp('2019 10:10:10', 'yyyy
hh:mm:ss')").show+------------------------------------------+|to_timestamp(2019
10:10:10, yyyy hh:mm:ss)|+------------------------------------------+|
2019-01-01 00:00:00|+------------------------------------------+After:scala>
sql("select to_timestamp('16',
'dd')").show+------------------------+|to_timestamp('16',
'dd')|+------------------------+| 1970-01-16
00:00:00|+------------------------+scala> sql("select to_date('16',
'dd')").show+-------------------+|to_date('16', 'dd')|+-------------------+|
1970-01-16|+-------------------+scala> sql("select to_timestamp('2019 40',
'yyyy mm')").show+----------------------------------+|to_timestamp('2019
40', 'yyyy mm')|+----------------------------------+| 2019-01-01
00:40:00|+----------------------------------+scala> sql("select
to_timestamp('2019 10:10:10', 'yyyy
hh:mm:ss')").show+----------------------------------------------+|to_timestamp('2019
10:10:10', 'yyyy
hh:mm:ss')|+----------------------------------------------+| 2019-01-01
10:10:10|+----------------------------------------------+[7.0][SPARK-31762][SQL]
Fix perf regression of date/timestamp formatting in toHiveString (+138,
-52)>This PR avoids both unnecessary overhead of converting Java date-time
types to micros/days before formatting and unnecessary conversion from input
micros/days to Java types for the formatters.[API][7.0][SPARK-31771][SQL]
Disable Narrow TextStyle for datetime pattern 'G/M/L/E/u/Q/q' (+1441,
-50)>In Spark 3.0, 5 continuous pattern characters with 'G/M/L/E/u/Q/q' is
Narrow-Text Style while it's Full-Text Style in Spark 2.4. With Narrow-Text
Style, Spark will only output the leading single letter of the value,
e.g. December would be D. This PR disables Narrow-Text Style for these
pattern characters in order to avoid the silent data change.After this PR,
queries with DateTime operations using DateTime patterns, e.g. G/M/L/E/u,
will fail if the pattern length is 5. But for other patterns, e,g. 'k', 'm',
they can still accept a certain number of letters.As a result, using
DateTime patterns like "GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "aa",
"aaa", which are not supported by the new parser but the legacy parser, will
hit an SparkUpgradeException. To bypass the exception, users can switch to
the legacy parser or change to the new DateTime patterns. However, using
DateTime patterns like "QQQQQ", "qqqqq", which are not supported by both the
new parser and the legacy parser, will hit an IllegalArgumentException. This
exception will be swallowed by Spark and the value null will be
returned.[API][7.0][SPARK-31808][SQL] Makes struct function's output name
and class name pretty (+40, -16)>This PR corrects struct's alias name and
class name in ExpressionInfo.Before:scala> sql("DESC FUNCTION
struct").show(false)+------------------------------------------------------------------------------------+|function_desc
|+------------------------------------------------------------------------------------+|Function:
struct ||Class: org.apache.spark.sql.catalyst.expressions.NamedStruct
||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given
field
values.|+------------------------------------------------------------------------------------+scala>
sql("SELECT struct(1,
2)").show(false)+------------------------------+|named_struct(col1, 1, col2,
2)|+------------------------------+|[1, 2]
|+------------------------------+After:scala> sql("DESC FUNCTION
struct").show(false)+------------------------------------------------------------------------------------+|function_desc
|+------------------------------------------------------------------------------------+|Function:
struct ||Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct
||Usage: struct(col1, col2, col3, ...) - Creates a struct with the given
field
values.|+------------------------------------------------------------------------------------+scala>
sql("SELECT struct(1, 2)").show(false)+------------+|struct(1,
2)|+------------+|[1, 2] |+------------+[API][7.0][SPARK-31818][SQL] Fix
pushing down filters with java.time.Instant values in ORC (+77,
-48)>When spark.sql.datetime.java8API.enabled=true, pushing down filters
with java.time.Instant to ORC datasource can fail
by:java.lang.IllegalArgumentException: Wrong value class java.time.Instant
for TIMESTAMP.EQUALS leaf at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
at
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)This
PR fixes the error by converting java.time.Instant to java.sql.Timestamp in
the filters.[API][8.0][SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG
function (+43, -5)>This PR adds the built-in SQL
function, CURRENT_CATALOG for DataSourceV2 only.[API][8.0][SPARK-31673][SQL]
QueryExection.debug.toFile() to take an additional explain mode param (+90,
-34)>Previously, QueryExecution.debug.toFile always uses the Extended mode
to dump the query plan information. This PR allows users to specify the
desired explain mode.[API][8.0][SPARK-31710][SQL] Adds TIMESTAMP_SECONDS,
TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions (+237, -2)>This PR adds the
three built-in SQL functions
[TIMESTAMP_SECONDS, TIMESTAMP_MILLIS, TIMESTAMP_MICROS] in order to provide
the convenient ways to create timestamps by interpreting the values as the
number of seconds, milliseconds and microseconds since 1970-01-01 00:00:00
UTC respectively.For example,sql("select TIMESTAMP_SECONDS(t.a) as timestamp
from values(1230219000),(-1230219000) as
t(a)").show(false)+-------------------------+|timestamp
|+-------------------------+|2008-12-25 23:30:00 ||1931-01-07 16:30:00
|+-------------------------+sql("select TIMESTAMP_MILLIS(t.a) as timestamp
from values(1230219000123),(-1230219000123) as
t(a)").show(false)+-------------------------------+|timestamp
|+-------------------------------+|2008-12-25 23:30:00.123 ||1931-01-07
16:29:59.877 |+-------------------------------+sql("select
TIMESTAMP_MICROS(t.a) as timestamp from
values(1230219000123123),(-1230219000123123) as
t(a)").show(false)+------------------------------------+|timestamp
|+------------------------------------+|2008-12-25 23:30:00.123123
||1931-01-07 16:29:59.876877
|+------------------------------------+[API][7.0][SPARK-31761][SQL] Cast
integer to Long to avoid IntegerOverflow for IntegralDivide operator (+57,
-12)>This PR casts Byte/Short/Integer to Long for the left and right
children of IntegralDivide to avoid overflow.For cases like:// the df is
constructed from : (-2147483648, -1) --> (_c0, _c1)val res =
df.selectExpr("_c0 div _c1")res.collectBefore:res1:
Array[org.apache.spark.sql.Row] = Array([-2147483648])After:res1:
Array[org.apache.spark.sql.Row] = Array([2147483648])[8.0][SPARK-31793][SQL]
Reduce the memory usage in file scan location metadata (+61, -4)>Currently,
the operator of Data Source Scan stores all the paths in its metadata. The
metadata is kept when a SparkPlan is converted
into SparkPlanInfo. SparkPlanInfo can be used to construct the Spark plan
graph in UI. However, the paths can be still very large (e.g., when many
partitions still remain after partition pruning), while UI pages only can
show up to 100 bytes for the location metadata. Thus, this PR reduces the
number of paths stored in metadata to reduce memory
usage.[API][6.7][SPARK-31854][SQL] Invoke in MapElementsExec should not
propagate null (+12, -2)>This PR fixes a NullPointerException caused
by Dataset.map when whole-stage codegen is enabled by
setting propagateNull to false when
initializing Invoke.[7.0][SPARK-31827][SQL] fail datetime parsing/formatting
if detect the Java 8 bug of stand-alone form (+19, -4)>This PR detects the
usage of the LLL/qqq datetime pattern string under JDK8 and throws an
exception with a clear error message, to avoid hitting a JDK8
bug: https://bugs.openjdk.java.net/browse/JDK-8114833.[7.0][SPARK-31867][SQL]
Disable year type datetime patterns which are longer than 10 (+99, -10)>This
PR fails usage of datetime pattern in the form of y..y and Y..Y with lengths
greater than 10 to avoid hitting a JDK bug.[7.0][SPARK-31874][SQL]
Use FastDateFormat as the legacy fractional formatter (+6, -1)>This
PR:replaces SimpleDateFormat by FastDateFormat as the legacy formatter
of FractionTimestampFormatter, to utilize the internal cache
of FastDateFormat, and avoid parsing the default pattern yyyy-MM-dd
HH:mm:ss.optimizes LegacyFastTimestampFormatter for java.sql.Timestamp w/o
fractional part, to avoid conversions to microseconds for patterns without
the fractional part.[7.0][SPARK-31885][SQL] Fix filter push down for old
millis timestamps to Parquet (+25, -22)>This PR fixes conversions
of java.sql.Timestamp to milliseconds in ParquetFilter by using existing
functions
from DateTimeUtils fromJavaTimestamp() and microsToMillis().[7.0][API][SPARK-31888][SQL]
Support java.time.Instant in Parquet filter pushdown (+83, -70)>This PR
enables push down of filters with java.time.Instant attributes
by:modifying ParquetFilters.valueCanMakeFilterOn() to accept filters
with java.time.Instant attributes.adding ParquetFilters.timestampToMicros() to
support both types java.sql.Timestamp and java.time.Instant in conversions
to microseconds.reusing timestampToMicros in constructing of Parquet
filters.[8.0][API][SPARK-28067][SQL] Fix incorrect results for decimal
aggregate sum by returning null on decimal overflow (+211, -25)>This PR
fixes wrong results in sum aggregate function with decimals in case of
overflow by adding an extra flag field in the sum agg
function.[8.0][SPARK-28481][SQL] More expressions should extend
NullIntolerant (+180, -103)>This PR makes more expressions
extend NullIntolerant, and as a result can avoid skew join if the join
column has many null values and improve query
performance.[7.0][SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver
session timezone issues (+209, -185)>This PR fixes:SPARK-31861 "Thriftserver
collecting timestamp not using spark.sql.session.timeZone" by converting the
Timestamp values to String earlier, in SparkExecuteStatementOperation,
using HiveResults.toHiveString().SPARK-31859 "Thriftserver not working with
spark.sql.datetime.java8API.enabled=true" by
using HiveResults.toHiveString().[7.0][SPARK-31354] SparkContext should only
register one SparkSession ApplicationEnd listener (+41, -11)>This PR makes
sure that getOrCreate only registers Spark listener once. For
example,SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()SparkSession.builder().master("local").getOrCreate()SparkSession.clearActiveSession()SparkSession.clearDefaultSession()Before
this PR, there are two listeners registered
at ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb,
<- first listenerorg.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- second
listenerAfter this PR, there's only one listener registered
at ListenerBus:[org.apache.spark.status.AppStatusListener5f610071,org.apache.spark.HeartbeatReceiverd400c17,org.apache.spark.sql.SparkSession$$anon$125849aeb]
<- only one listenerML[API][8.0][SPARK-31734][ML][PYSPARK] Add weight
support in ClusteringEvaluator (+167, -67)>This PR adds setWeightCol method
to ClusteringEvaluator as BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator do.[8.0][SPARK-31803][ML]
Make sure instance weight is not negative (+32, -16)>This PR adds checks to
make sure instance weight is not negative in the algorithms that support
instance weight.[8.0][SPARK-31840][ML] Add instance weight support in
LogisticRegressionSummary (+134, -43)>Add instance weight support in
LogisticRegressionSummary to match its capabilities with those of
LogisticRegression, MulticlassClassificationEvaluator and
BinaryClassificationEvaluator.[API][8.0][SPARK-31768][ML] Add getMetrics in
Evaluators (+905, -591)>Currently, Evaluator.evaluate can only access to one
metrics. This PR adds getMetrics method in all the Evaluators to allow users
to get multiple metrics.For example: val trainer = new LinearRegression val
model = trainer.fit(dataset) val predictions = model.transform(dataset) val
evaluator = new RegressionEvaluator() val metrics =
evaluator.getMetrics(predictions) val rmse = metrics.rootMeanSquaredError
val r2 = metrics.r2 val mae = metrics.meanAbsoluteError val variance =
metrics.explainedVariancePYTHON[7.0][SPARK-31788][CORE][DSTREAM][PYTHON]
Recover the support of union for different types of RDD and DStreams (+39,
-5)>This PR manually specifies the class for the input array being used
in (SparkContext|StreamingContext).union. It fixes a regression introduced
from SPARK-25737.[7.0][SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions
more Pythonic (+61, -17)>This PR makes PySpark exception more Pythonic by
hiding JVM stacktrace by default, when the JVM exceptions are the analyzer's
exceptions, AnalysisException, ParseException, StreamingQueryException,
QueryExecutionException, IllegalArgumentException, and PythonException
[thrown by Python UDFs]. It can be enabled by turning
on spark.sql.pyspark.jvmStacktrace.enabled configuration.[8.0][SPARK-25351][SQL][PYTHON]
Handle Pandas category type when converting from Python with Arrow (+52,
-0)>This PR adds support for Pandas category type while converting from
python with Arrow enabled. The category column will be converted to whatever
type the category elements are as is the case with Arrow
disabled.[API][8.0][SPARK-31763][PYSPARK] Add inputFiles method in PySpark
DataFrame Class (+32, -0)>This PR adds inputFiles() method to
PySpark DataFrame to enable PySpark users to list all files constituting
a DataFrame.SS[API][7.0][SPARK-31706][SS] Add back the support of streaming
update mode (+54, -20)>In Spark 2.4, all built-in v2 streaming sinks support
all streaming output modes, and v2 sinks are enabled by
default(see SPARK-22911). To keep the backward compatibility, we added back
the support of streaming UPDATE mode that was dropped in the unreleased 3.0
branch.[API][7.0][SPARK-31792][SS][DOCS] Introduce the structured streaming
UI in the Web UI doc (+28, -0)>This PR adds the structured streaming UI
introduction in the Web UI doc.[8.0][SPARK-30915][SS]
CompactibleFileStreamLog: Avoid reading the metadata log file when finding
the latest batch ID (+102, -10)>When the compacted metadata log file becomes
huge, writing outputs for the compact + 1 batch can be slow due to
unnecessarily reading the compacted metadata log file. To get rid of the
unnecessary reading, this PR adds getLatestBatchId() method
in CompactibleFileStreamLog in the complement of getLatest(). The method
doesn't read the content of the latest batch metadata log file, and can be
applied to both FileStreamSource and FileStreamSink to avoid unnecessary
latency when reading the log file.UI[API][8.0][SPARK-29303][WEB UI] Add UI
support for stage level scheduling (+657, -103)>This PR adds the UI support
for stage level scheduling and ResourceProfiles:Add ResourceProfile Id to
Stage pageAdd ResourceProfile Id to Executor pageAdd a section with
ResourceProfile Id to Environment pageAlso, the rest API for the environment
page is updated to include the ResourceProfile information.      
[7.0][SPARK-31882][WEBUI] DAG-viz is not rendered correctly with pagination
(+31, -6)>This PR fixes a DAG-viz bug, in which rendering fails with
pagination when DAG-viz fetches link urls for each stage of a job from the
stage table.[API][8.0][SPARK-31642] Add Pagination Support for Structured
Streaming Page (+209, -91)>This PR adds the pagination support for the
structured streaming page in order to:Help users to analyze structured
streaming queries in a much better wayImprove the consistent of Spark Web UI
across various pagesPrevent potential OOMOTHER[7.0][SPARK-31786][K8S][BUILD]
Upgrade kubernetes-client to 4.9.2 (+14,
-11)>[API][8.0][SPARK-31759][DEPLOY] Support configurable max number of
rotate logs for spark daemons (+11, -3)>
[8.0][SPARK-31214][BUILD] Upgrade Janino to 3.1.2 (+14, -12)>This PR
upgrades Janino to
3.1.2: http://janino-compiler.github.io/janino/changelog.html[8.0][SPARK-31858][BUILD]
Upgrade commons-io to 2.5 in Hadoop 3.2 profile (+2, -2)>This PR
upgrades commons-io from 2.4 to 2.5 for Apache Spark
3.1[8.0][SPARK-31876][BUILD] Upgrade to Zstd 1.4.5 (+4, -4)>This PR aims to
upgrade to Zstd 1.4.5: https://github.com/facebook/zstd/releases/tag/v1.4.5




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]