Identifying specific persisted DataFrames via getPersistentRDDs()

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Identifying specific persisted DataFrames via getPersistentRDDs()

Nicholas Chammas

This seems to be an underexposed part of the API. My use case is this: I want to unpersist all DataFrames except a specific few. I want to do this because I know at a specific point in my pipeline that I have a handful of DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(), which is the only way I’m aware of to ask Spark for all currently persisted RDDs:

>>> a = spark.range(10).persist()
>>> a.rdd.id()
8
>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in PySpark getPersistentRDDs() is buried behind the Java sub-objects, so I know I’m reaching here. But is there a way to do what I want in PySpark without manually tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize currently undocumented APIs like id(), to make this use case possible?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

rxin
Why do you need the underlying RDDs? Can't you just unpersist the dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <[hidden email]> wrote:

This seems to be an underexposed part of the API. My use case is this: I want to unpersist all DataFrames except a specific few. I want to do this because I know at a specific point in my pipeline that I have a handful of DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(), which is the only way I’m aware of to ask Spark for all currently persisted RDDs:

>>> a = spark.range(10).persist()
>>> a.rdd.id()
8
>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in PySpark getPersistentRDDs() is buried behind the Java sub-objects, so I know I’m reaching here. But is there a way to do what I want in PySpark without manually tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize currently undocumented APIs like id(), to make this use case possible?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Nicholas Chammas

I certainly can, but the problem I’m facing is that of how best to track all the DataFrames I no longer want to persist.

I create and persist various DataFrames throughout my pipeline. Spark is already tracking all this for me, and exposing some of that tracking information via getPersistentRDDs(). So when I arrive at a point in my program where I know, “I only need this DataFrame going forward”, I want to be able to tell Spark “Please unpersist everything except this one DataFrame”. If I cannot leverage the information about persisted DataFrames that Spark is already tracking, then the alternative is for me to carefully track and unpersist DataFrames when I no longer need them.

I suppose the problem is similar at a high level to garbage collection. Tracking and freeing DataFrames manually is analogous to malloc and free; and full automation would be Spark automatically unpersisting DataFrames when they were no longer referenced or needed. I’m looking for an in-between solution that lets me leverage some of the persistence tracking in Spark so I don’t have to do it all myself.

Does this make more sense now, from a use case perspective as well as from a desired API perspective?


On Thu, May 3, 2018 at 10:26 PM Reynold Xin <[hidden email]> wrote:
Why do you need the underlying RDDs? Can't you just unpersist the dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <[hidden email]> wrote:

This seems to be an underexposed part of the API. My use case is this: I want to unpersist all DataFrames except a specific few. I want to do this because I know at a specific point in my pipeline that I have a handful of DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(), which is the only way I’m aware of to ask Spark for all currently persisted RDDs:

>>> a = spark.range(10).persist()
>>> a.rdd.id()
8
>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in PySpark getPersistentRDDs() is buried behind the Java sub-objects, so I know I’m reaching here. But is there a way to do what I want in PySpark without manually tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize currently undocumented APIs like id(), to make this use case possible?

Nick

Reply | Threaded
Open this post in threaded view
|

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Mark Hamstra
If I am understanding you correctly, you're just saying that the problem is that you know what you want to keep, not what you want to throw away, and that there is no unpersist DataFrames call based on that what-to-keep information.

On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas <[hidden email]> wrote:

I certainly can, but the problem I’m facing is that of how best to track all the DataFrames I no longer want to persist.

I create and persist various DataFrames throughout my pipeline. Spark is already tracking all this for me, and exposing some of that tracking information via getPersistentRDDs(). So when I arrive at a point in my program where I know, “I only need this DataFrame going forward”, I want to be able to tell Spark “Please unpersist everything except this one DataFrame”. If I cannot leverage the information about persisted DataFrames that Spark is already tracking, then the alternative is for me to carefully track and unpersist DataFrames when I no longer need them.

I suppose the problem is similar at a high level to garbage collection. Tracking and freeing DataFrames manually is analogous to malloc and free; and full automation would be Spark automatically unpersisting DataFrames when they were no longer referenced or needed. I’m looking for an in-between solution that lets me leverage some of the persistence tracking in Spark so I don’t have to do it all myself.

Does this make more sense now, from a use case perspective as well as from a desired API perspective?


On Thu, May 3, 2018 at 10:26 PM Reynold Xin <[hidden email]> wrote:
Why do you need the underlying RDDs? Can't you just unpersist the dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <[hidden email]> wrote:

This seems to be an underexposed part of the API. My use case is this: I want to unpersist all DataFrames except a specific few. I want to do this because I know at a specific point in my pipeline that I have a handful of DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(), which is the only way I’m aware of to ask Spark for all currently persisted RDDs:

>>> a = spark.range(10).persist()
>>> a.rdd.id()
8
>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in PySpark getPersistentRDDs() is buried behind the Java sub-objects, so I know I’m reaching here. But is there a way to do what I want in PySpark without manually tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize currently undocumented APIs like id(), to make this use case possible?

Nick


Reply | Threaded
Open this post in threaded view
|

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Nicholas Chammas

That’s correct. I probably would have done better to title this thread something like “How to effectively track and release persisted DataFrames”.

I jumped the gun in my initial email by referencing getPersistentRDDs() as a potential solution, but in theory the desired API is something like spark.unpersistAllExcept([list of DataFrames or RDDs]). This seems awkward, but I suspect the underlying use case is common.

An alternative or complementary approach, perhaps, would be to allow persistence (and perhaps even checkpointing) to be explicitly scoped. I think in some circles this is called “Scope-based Resource Management” or “Resource acquisition is initialization” (RAII). It would make it a lot easier to track and release DataFrames or RDDs when they are no longer needed in cache.

Nick

2018년 5월 8일 (화) 오후 1:32, Mark Hamstra <[hidden email]>님이 작성:

If I am understanding you correctly, you're just saying that the problem is that you know what you want to keep, not what you want to throw away, and that there is no unpersist DataFrames call based on that what-to-keep information.

On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas <[hidden email]> wrote:

I certainly can, but the problem I’m facing is that of how best to track all the DataFrames I no longer want to persist.

I create and persist various DataFrames throughout my pipeline. Spark is already tracking all this for me, and exposing some of that tracking information via getPersistentRDDs(). So when I arrive at a point in my program where I know, “I only need this DataFrame going forward”, I want to be able to tell Spark “Please unpersist everything except this one DataFrame”. If I cannot leverage the information about persisted DataFrames that Spark is already tracking, then the alternative is for me to carefully track and unpersist DataFrames when I no longer need them.

I suppose the problem is similar at a high level to garbage collection. Tracking and freeing DataFrames manually is analogous to malloc and free; and full automation would be Spark automatically unpersisting DataFrames when they were no longer referenced or needed. I’m looking for an in-between solution that lets me leverage some of the persistence tracking in Spark so I don’t have to do it all myself.

Does this make more sense now, from a use case perspective as well as from a desired API perspective?


On Thu, May 3, 2018 at 10:26 PM Reynold Xin <[hidden email]> wrote:
Why do you need the underlying RDDs? Can't you just unpersist the dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <[hidden email]> wrote:

This seems to be an underexposed part of the API. My use case is this: I want to unpersist all DataFrames except a specific few. I want to do this because I know at a specific point in my pipeline that I have a handful of DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(), which is the only way I’m aware of to ask Spark for all currently persisted RDDs:

>>> a = spark.range(10).persist()
>>> a.rdd.id()
8
>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in PySpark getPersistentRDDs() is buried behind the Java sub-objects, so I know I’m reaching here. But is there a way to do what I want in PySpark without manually tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize currently undocumented APIs like id(), to make this use case possible?

Nick