Is RDD thread safe?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Is RDD thread safe?

Chang Chen
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Weichen Xu
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Chang Chen
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Weichen Xu
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Chang Chen
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about cached data?

See codes from BlockManager
 
def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>                                      // 1. no data is cached.
    // Need to compute the block 
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,  if both returns none  when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is waste, then this is OK

Thanks
Chang


Weichen Xu <[hidden email]> 于2019年11月25日周一 上午11:52写道:
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Weichen Xu
emmm, I haven't check code, but I think if an RDD is referenced in several places, the correct behavior should be: when this RDD data is needed, it will be computed and then cached only once, otherwise it should be treated as a bug. If you are suspicious there's a race condition, you could create a jira ticket.

On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <[hidden email]> wrote:
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about cached data?

See codes from BlockManager
 
def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>                                      // 1. no data is cached.
    // Need to compute the block 
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,  if both returns none  when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is waste, then this is OK

Thanks
Chang


Weichen Xu <[hidden email]> 于2019年11月25日周一 上午11:52写道:
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Imran Rashid-4
I think Chang is right, but I also think this only comes up in limited scenarios.  I initially thought it wasn't a bug, but after some more thought I have some concerns in light of the issues we've had w/ nondeterministic RDDs, eg. repartition().
 
Say I have code like this:

val cachedRDD = sc.textFile(...).cache()
(0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }

that is, my cached rdd is referenced by many threads concurrently before the RDD has been cached.

When one of those tasks gets to cachedRDD.getOrCompute(), there are a few possible scenarios:

1) the partition has never been referenced before.  BlockManager.getOrCompute() will say the block doesn't exist, so it will get recomputed (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360)

2) The partition has been fully materialized by another task, the blockmanagermaster on the driver already knows about it, so BlockManager.getOrCompute() will return a pointer to the cached block (perhaps on another node)

3) The partition is actively being computed by another task on the same executor.  Then BlockManager.getOrCompute() will not know about that other version of the task (it only knows about blocks that are fully materialized, IIUC).  But eventually, when the tasks try to actually write the data, they'll try to get a write lock for the block: https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
one task will get the write lock first; the other task will block on the other task, and then realize the block exists and just return those values.

4) The partition is actively being compute by another task on a *different* executor.  IIUC, Spark doesn't try to do anything to prevent both tasks from computing the block themselves in this case.  (To do so would require extra coordination in driver before writing every single block.)  Those locks in BlockManager and BlockInfoManager don't stop this case, because this is happening in entirely independent JVMs.
There normally won't be any problem here -- if the RDD is totally deterministic, then you'll just end up with an extra copy of the data.  In a way, this is good, the cached RDD is in high demand, so having an extra copy isn't so bad.
OTOH, if the RDD is non-deterministic, you've now got two copies with different values.  Then again, RDD cache is not resilient in general, so you've always got to be able to handle an RDD getting recomputed if its evicted from the cache.  So this should be pretty similar.

On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <[hidden email]> wrote:
emmm, I haven't check code, but I think if an RDD is referenced in several places, the correct behavior should be: when this RDD data is needed, it will be computed and then cached only once, otherwise it should be treated as a bug. If you are suspicious there's a race condition, you could create a jira ticket.

On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <[hidden email]> wrote:
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about cached data?

See codes from BlockManager
 
def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>                                      // 1. no data is cached.
    // Need to compute the block 
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,  if both returns none  when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is waste, then this is OK

Thanks
Chang


Weichen Xu <[hidden email]> 于2019年11月25日周一 上午11:52写道:
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Mridul Muralidharan

Very well put Imran. This is a variant of executor failure after an RDD has been computed (including caching). In general, non determinism in spark is going to lead to inconsistency.
The only reasonable solution for us, at that time, was to make pseudo-randomness repeatable and checkpoint after so that recomputation becomes deterministic.


Regards,
Mridul 

On Mon, Nov 25, 2019 at 9:30 AM Imran Rashid <[hidden email]> wrote:
I think Chang is right, but I also think this only comes up in limited scenarios.  I initially thought it wasn't a bug, but after some more thought I have some concerns in light of the issues we've had w/ nondeterministic RDDs, eg. repartition().
 
Say I have code like this:

val cachedRDD = sc.textFile(...).cache()
(0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }

that is, my cached rdd is referenced by many threads concurrently before the RDD has been cached.

When one of those tasks gets to cachedRDD.getOrCompute(), there are a few possible scenarios:

1) the partition has never been referenced before.  BlockManager.getOrCompute() will say the block doesn't exist, so it will get recomputed (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360)

2) The partition has been fully materialized by another task, the blockmanagermaster on the driver already knows about it, so BlockManager.getOrCompute() will return a pointer to the cached block (perhaps on another node)

3) The partition is actively being computed by another task on the same executor.  Then BlockManager.getOrCompute() will not know about that other version of the task (it only knows about blocks that are fully materialized, IIUC).  But eventually, when the tasks try to actually write the data, they'll try to get a write lock for the block: https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
one task will get the write lock first; the other task will block on the other task, and then realize the block exists and just return those values.

4) The partition is actively being compute by another task on a *different* executor.  IIUC, Spark doesn't try to do anything to prevent both tasks from computing the block themselves in this case.  (To do so would require extra coordination in driver before writing every single block.)  Those locks in BlockManager and BlockInfoManager don't stop this case, because this is happening in entirely independent JVMs.
There normally won't be any problem here -- if the RDD is totally deterministic, then you'll just end up with an extra copy of the data.  In a way, this is good, the cached RDD is in high demand, so having an extra copy isn't so bad.
OTOH, if the RDD is non-deterministic, you've now got two copies with different values.  Then again, RDD cache is not resilient in general, so you've always got to be able to handle an RDD getting recomputed if its evicted from the cache.  So this should be pretty similar.

On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <[hidden email]> wrote:
emmm, I haven't check code, but I think if an RDD is referenced in several places, the correct behavior should be: when this RDD data is needed, it will be computed and then cached only once, otherwise it should be treated as a bug. If you are suspicious there's a race condition, you could create a jira ticket.

On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <[hidden email]> wrote:
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about cached data?

See codes from BlockManager
 
def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>                                      // 1. no data is cached.
    // Need to compute the block 
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,  if both returns none  when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is waste, then this is OK

Thanks
Chang


Weichen Xu <[hidden email]> 于2019年11月25日周一 上午11:52写道:
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is RDD thread safe?

Chang Chen
In reply to this post by Imran Rashid-4
Thank you Imran

I will check whether there is memory waste or not

Imran Rashid <[hidden email]> 于2019年11月26日周二 上午1:30写道:
I think Chang is right, but I also think this only comes up in limited scenarios.  I initially thought it wasn't a bug, but after some more thought I have some concerns in light of the issues we've had w/ nondeterministic RDDs, eg. repartition().
 
Say I have code like this:

val cachedRDD = sc.textFile(...).cache()
(0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }

that is, my cached rdd is referenced by many threads concurrently before the RDD has been cached.

When one of those tasks gets to cachedRDD.getOrCompute(), there are a few possible scenarios:

1) the partition has never been referenced before.  BlockManager.getOrCompute() will say the block doesn't exist, so it will get recomputed (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360)

2) The partition has been fully materialized by another task, the blockmanagermaster on the driver already knows about it, so BlockManager.getOrCompute() will return a pointer to the cached block (perhaps on another node)

3) The partition is actively being computed by another task on the same executor.  Then BlockManager.getOrCompute() will not know about that other version of the task (it only knows about blocks that are fully materialized, IIUC).  But eventually, when the tasks try to actually write the data, they'll try to get a write lock for the block: https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
one task will get the write lock first; the other task will block on the other task, and then realize the block exists and just return those values.

4) The partition is actively being compute by another task on a *different* executor.  IIUC, Spark doesn't try to do anything to prevent both tasks from computing the block themselves in this case.  (To do so would require extra coordination in driver before writing every single block.)  Those locks in BlockManager and BlockInfoManager don't stop this case, because this is happening in entirely independent JVMs.
There normally won't be any problem here -- if the RDD is totally deterministic, then you'll just end up with an extra copy of the data.  In a way, this is good, the cached RDD is in high demand, so having an extra copy isn't so bad.
OTOH, if the RDD is non-deterministic, you've now got two copies with different values.  Then again, RDD cache is not resilient in general, so you've always got to be able to handle an RDD getting recomputed if its evicted from the cache.  So this should be pretty similar.

On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu <[hidden email]> wrote:
emmm, I haven't check code, but I think if an RDD is referenced in several places, the correct behavior should be: when this RDD data is needed, it will be computed and then cached only once, otherwise it should be treated as a bug. If you are suspicious there's a race condition, you could create a jira ticket.

On Mon, Nov 25, 2019 at 12:21 PM Chang Chen <[hidden email]> wrote:
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about cached data?

See codes from BlockManager
 
def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>                                      // 1. no data is cached.
    // Need to compute the block 
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,  if both returns none  when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is waste, then this is OK

Thanks
Chang


Weichen Xu <[hidden email]> 于2019年11月25日周一 上午11:52写道:
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen <[hidden email]> wrote:
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. 

However RDD id is not lazy, so there is race condition.

Thanks 
Chang


Weichen Xu <[hidden email]> 于2019年11月12日周二 下午1:22写道:
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen <[hidden email]> wrote:
Hi all

I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD  is thread safe or not

Thanks