Memory leak in SortMergeJoin

classic Classic list List threaded Threaded
1 message Options
tao
Reply | Threaded
Open this post in threaded view
|

Memory leak in SortMergeJoin

tao
Hi all,
   I've been hitting this issue, and hoping to get some traction going at: https://issues.apache.org/jira/browse/SPARK-21492

If SortMergeJoinScanner doesn't consume the iterator from
UnsafeExternalRowSorter entirely, the memory that
UnsafeExternalSorter acquired from TaskMemoryManager will not
be released. This leads to a memory leak, spills, and OOME. A
page will be held per partition of the unused iterator.

For example, this will fail on 3.0-snapshot
./bin/pyspark --master local[4]
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()


Cheers,

Tao