Numpy memory not being released in executor map-partition function (memory leak)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Numpy memory not being released in executor map-partition function (memory leak)

This post was updated on .
I believe I have uncovered a strange interaction between pySpark, Numpy and
Python which produces a memory leak. I wonder if anyone has any ideas of
what the issue could be?

I have the following minimal working example (gist of code):

from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np

sc = SparkContext()
sqlContext = SQLContext(sc)

# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)

def toy_example(rdd):

    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]

    # Return a single `0` value
    return [[0]]

# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()

When the above code is run, the memory of the executor's Python process
steadily increases after each iteration suggesting the memory of the
previous iteration isn't being released. This can lead to a job failure if
the memory exceeds the executor's memory limit.

Any of the following prevents the memory leak:
* Remove the line `data = list(rdd)`
* Insert the line `rand_data = list(rand_data.tolist())` after `rand_data =
* Remove the line `int(e)`

Some things to take notice of:
* While the rdd data is not used in the function, the line is required to
reproduce the leak. Reading in the RDD data has to occur as well as the
large number of ints to reproduce the leak
* The memory leak is likely due to the large Numpy array rand_data not being
* You have to do the int operation on each element of rand_data to reproduce
the leak

I have experimented with gc and malloc_trim to easy memory usage with no

Versions used: EMR 5.12.1, Spark 2.2.1, Python 2.7.13, Numpy 1.14.0

Some more details can be found in a related StackOverflow post.

Any ideas on what the issue could be would be very grateful.

Many thanks,