This is the bi-weekly Apache Spark digest from the Databricks OSS team (this time we combined 4 weeks).
For each API/configuration/behavior change, an [API] tag is added in the title.
All the content has been appended to this doc. It will be updated when we send the email.
[SPARK-32091][CORE] Ignore timeout error when removing blocks on the lost executor (+191, -19)>
This PR adds the check to see whether the executor is lost (by asking the CoarseGrainedSchedulerBackend) after a timeout error raised in BlockManagerMasterEndpoint due to removing blocks (e.g. RDD, broadcast, shuffle). If the executor is lost, we will ignore the error. Otherwise, throw the error.
This makes Spark more robust against the executor lost events. For example, the events are triggered due to Spot instance termination.
[SPARK-32103][CORE] Fix the code to split host and port in the string (+142, -19)>
The current code to split host and port in the string is incorrect in the IPv6 scenario.
[API][SPARK-29292][SPARK-30010][CORE] Let core compile for Scala 2.13 (+119, -122)>
Fix the core module so that it can compile with Scala 2.13. It's a step towards the Scala 2.13 support.
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors (+141, -4)>
Currently when an executor is decommissioned, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers give different timeouts before they take away the nodes. For example in case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.
Added a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.
[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown (+1150, -255)>
Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.
Adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .
Introduces two new configs parameters, spark.storage.decommission.shuffleBlocks.enabled & spark.storage.decommission.rddBlocks.enabled that control which blocks should be migrated during storage decommissioning.
[SPARK-32215] Expose a (protected) /workers/kill endpoint on the MasterWebUI (+190, -5)>
Allows an external agent to inform the Master that certain hosts are being decommissioned.
The current decommissioning is triggered by the Worker getting a SIGPWR (out of band possibly by some cleanup hook), which then informs the Master about it. This approach may not be feasible in some environments that cannot trigger a clean up hook on the Worker. In addition, when a large number of worker nodes are being decommissioned then the master will get a flood of messages.
[SPARK-32350][CORE] Add batch-write on LevelDB to improve performance of HybridStore (+66, -16)>
Improve the performance of HybridStore by adding batch write support to LevelDB
[SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost
If an executor is lost, the DAGScheduler handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the DAGScheduler again removes the executor and by right, should unregister its outputs.
Without the changes, the loss of a node could require two stage attempts to recover instead of one.
[SPARK-32437][CORE] Improve MapStatus deserialization speed with RoaringBitmap 0.9.0 (+57, -57)>
This PR aims to speed up MapStatus deserialization by 5~18% with the upgrading RoaringBitmap from 0.7.45 to 0.9.0 and new APIs.
[API][SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory (+102, -15)>
Support set off heap memory in ExecutorResourceRequests
[API][SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature (+392, -19)>
When dynamic allocation is enabled, if there is an unschedulable taskset due to blacklisting, Spark can request more executors needed to schedule the unschedulable blacklisted tasks instead of aborting immediately. In this case, the scheduler will pass an event SparkListenerUnschedulableTaskSetAdded to ExecutorAllocationManager for more executors. Once the event is sent, we start the abortTimer similar to SPARK-22148 SPARK-15815 to abort in the case when no new executors launched either due to max executors reached or cluster manager is out of capacity.
This is an improvement for Spark's Fault tolerance.
[SPARK-28169][SQL] Convert scan predicate condition to CNF (+152, -27)>
This PR optimizes the partition pruning by pushing down predicate connected with OR. See the following example:
select * from test where dt = 20190626 or (dt = 20190627 and xxx="a")
The where condition will be converted to CNF: (dt = 20190626 or dt = 20190627) and (dt = 20190626 or xxx = "a" ) so that the condition dt = 20190626 or dt = 20190627 can be push down when partition pruning.
[API][SPARK-31797][SQL] TIMESTAMP_SECONDS supports fractional input (+267, -39)>
This PR adds the fractional input support for TIMESTAMP_SECONDS:
> SELECT TIMESTAMP_SECONDS(1230219000.123);
[API][SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options (+59, -45)>
This PR fixes the following issues related to data source options:
[API][SPARK-32130][SQL] Disable the JSON option inferTimestamp by default (+130, -112)>
To prevent the Spark 3.0 perf regression while inferring schemas from JSON with potential timestamps fields, this PR sets the JSON option inferTimestamp to false as default.
[SPARK-32136][SQL] NormalizeFloatingNumbers should work on null struct (+15, -2)>
[SPARK-32145][SQL] ThriftCLIService.GetOperationStatus should include exception's stack trace to the error message (+106, -147)>
This PR makes a change in JDBC clients to give the full stack traces instead of only the message of the root cause the full stack traces instead of only the message of the root cause. See the following example: For JDBC end-users, the message
Error running query: org.apache.spark.sql.AnalysisException: The second argument of 'date_sub' function needs to be an integer.;
is better than just seeing the root cause:
Caused by: java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2
[SPARK-32167][SQL] Fix GetArrayStructFields to respect inner field's nullability together (+42, -5)>
[API][SPARK-31317][SQL] Add withField method to Column (+815, -3)>
[SPARK-32163][SQL] Nested pruning should work even with cosmetic variations (+28, -3)>
If the expressions extracting nested fields have cosmetic variations like qualifier difference, currently nested column pruning does not work well. That means, two attributes in the same query, are semantically the same, but their nested column extractors are treated differently in nested column pruning.
This patch proposes to deal with cosmetic variations when processing nested column extractors in NestedColumnAliasing.
[API][SPARK-20680][SQL] Spark SQL does not support creating table with void column datatype (+191, -14)>
[API][SPARK-32168][SQL] Fix hidden partitioning correctness bug in SQL overwrite (+107, -37)>
This PR fixes a correctness bug in the Data Source V2 APIs. If a table has hidden partitions, all of the values for those partitions are dropped instead of dynamically overwriting changed partitions.
This only affects SQL commands (not DataFrameWriter) writing to tables that have hidden partitions. It is also only a problem when the partition overwrite mode is dynamic.
When converting an INSERT OVERWRITE query to a v2 overwrite plan, Spark attempts to detect when a dynamic overwrite and a static overwrite will produce the same result so it can use the static overwrite. Spark incorrectly detects when dynamic and static overwrites are equivalent when there are hidden partitions, such as days(ts).
This updates the analyzer rule ResolveInsertInto to always use a dynamic overwrite when the mode is dynamic, and static when the mode is static. This avoids the problem by not trying to determine whether the two plans are equivalent and always using the one that corresponds to the partition overwrite mode.
[API][SPARK-31875][SQL] Provide an option to disable user supplied hints (+61, -0)>
New configuration. Introduce a new SQL config spark.sql.optimizer.ignoreHints. When this is set to true, application of hints are disabled. This can be helpful to study the impact of performance difference when hints are applied vs when they are not.
[API][SPARK-32207][SQL] Support 'F'-suffixed Float Literals (+108, -2)>
Adds a float literal SQL syntax, e.g. select 1.1f.
[SPARK-32256][SQL][TEST-HADOOP2.7] Force to initialize Hadoop VersionInfo in HiveExternalCatalog (+78, -1)>
This is a regression in Spark 3.0.0 because we switched the default Hive execution version from 1.2.1 to 2.3.7. This PR forces to initialize Hadoop VersionInfo in HiveExternalCatalog to make sure Hive can get the Hadoop version when using the isolated classloader.
[SPARK-32220][SQL] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result (+29, -2)>
In current Join Hint strategies, if we use SHUFFLE_REPLICATE_NL hint, it will directly convert the join to Cartesian Product Join and lose join condition, making the result not correct. This PR fixes the mistake.
[API][SPARK-32133][SQL] Forbid time field steps for date start/end in Sequence (+27, -0)>
[API][SPARK-29358][SQL] Make unionByName optionally fill missing columns with nulls (+91, -5)>
[API][SPARK-32154][SQL] Use ExpressionEncoder for the return type of ScalaUDF to convert to catalyst type (+235, -94)>
This PR proposes to use ExpressionEncoder for the return type of ScalaUDF to convert to the catalyst type, instead of using CatalystTypeConverters. Then the UDF can return java 8 datetime classes no matter the spark.sql.datetime.java8API.enabled is true or not.
[API][SPARK-32270][SQL] Use TextFileFormat in CSV's schema inference with a different encoding (+14, -12)>
This PR proposes to use text datasource in CSV's schema inference. This makes it consistent that schema inference and actual reading use the same encoding to read files.
[SPARK-32241][SQL] Remove empty children of union (+65, -5)>
This PR removes the empty child relations of a Union, to simplify the plan.
[API][8.0][SPARK-31480][SQL] Improve the EXPLAIN FORMATTED's output for DSV2's Scan Node (+163, -15)>
[API][SPARK-32272][SQL] Add SQL standard command SET TIME ZONE (+308, -6)>
Adds the SQL standard command -
|Free forum by Nabble||Edit this page|