[SPARK-23207] Repro

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[SPARK-23207] Repro

tcondie

Hi,

 

We are able to reproduce this bug in Spark 2.4 using the following program:

 

import scala.sys.process._

import org.apache.spark.TaskContext

 

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}.repartition(20)

res.distinct.count

 

// kill an executor in the stage that performs repartition(239)

val df = res.repartition(113).cache.repartition(239).map { x =>

  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1) {

    throw new Exception("pkill -f java".!!)

  }

  x

}

df.distinct.count()

 

The first df.distinct.count correctly produces 100000000
The second df.distinct.count incorrect produces 99999769
 

If the cache step is removed then the bug does not reproduce.

 

Best regards,

Tyson

 

Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-23207] Repro

Sean Owen-2
Interesting but I'd put this on the JIRA, and also test vs master
first. It's entirely possible this is something else that was
subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is
also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM <[hidden email]> wrote:

>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1) {
>
>     throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 100000000
>
> The second df.distinct.count incorrect produces 99999769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: [SPARK-23207] Repro

tcondie
Hi Sean,

To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a large number e.g., 100; a suggestion from Jiang Xingbo.

I haven't seen any recent movement/PRs on this issue, but I'll see if we can repro with a more recent version of Spark.

Best regards,
Tyson

-----Original Message-----
From: Sean Owen <[hidden email]>
Sent: Friday, August 9, 2019 7:49 AM
To: [hidden email]
Cc: dev <[hidden email]>
Subject: Re: [SPARK-23207] Repro

Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM <[hidden email]> wrote:

>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 &&
> TaskContext.get.partitionId < 1) {
>
>     throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 100000000
>
> The second df.distinct.count incorrect produces 99999769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-23207] Repro

Xiao Li-2
Hi, Tyson, 

Could you open a new JIRA with correctness label? SPARK-23207 might not cover all the scenarios, especially when you using cache. 

Cheers,

Xiao 

On Fri, Aug 9, 2019 at 9:26 AM <[hidden email]> wrote:
Hi Sean,

To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a large number e.g., 100; a suggestion from Jiang Xingbo.

I haven't seen any recent movement/PRs on this issue, but I'll see if we can repro with a more recent version of Spark.

Best regards,
Tyson

-----Original Message-----
From: Sean Owen <[hidden email]>
Sent: Friday, August 9, 2019 7:49 AM
To: [hidden email]
Cc: dev <[hidden email]>
Subject: Re: [SPARK-23207] Repro

Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM <[hidden email]> wrote:
>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 &&
> TaskContext.get.partitionId < 1) {
>
>     throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 100000000
>
> The second df.distinct.count incorrect produces 99999769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Databricks Summit - Watch the talks 
Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-23207] Repro

cloud0fan
Hi Tyson,

Thanks for reporting it! I quickly checked the related scheduler code but can't find an obvious place that can go wrong with cached RDD.

Sean said that he can't produce it, but the second job fails. This is actually expected. We need a lot more changes to completely fix this problem, so currently the fix is to fail the job if the scheduler needs to retry an indeterminate shuffle map stage.

It would be great to know if we can reproduce this bug with the master branch.

Thanks,
Wenchen

On Sun, Aug 11, 2019 at 7:22 AM Xiao Li <[hidden email]> wrote:
Hi, Tyson, 

Could you open a new JIRA with correctness label? SPARK-23207 might not cover all the scenarios, especially when you using cache. 

Cheers,

Xiao 

On Fri, Aug 9, 2019 at 9:26 AM <[hidden email]> wrote:
Hi Sean,

To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a large number e.g., 100; a suggestion from Jiang Xingbo.

I haven't seen any recent movement/PRs on this issue, but I'll see if we can repro with a more recent version of Spark.

Best regards,
Tyson

-----Original Message-----
From: Sean Owen <[hidden email]>
Sent: Friday, August 9, 2019 7:49 AM
To: [hidden email]
Cc: dev <[hidden email]>
Subject: Re: [SPARK-23207] Repro

Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM <[hidden email]> wrote:
>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 &&
> TaskContext.get.partitionId < 1) {
>
>     throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 100000000
>
> The second df.distinct.count incorrect produces 99999769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Databricks Summit - Watch the talks 
Reply | Threaded
Open this post in threaded view
|

Re: [SPARK-23207] Repro

Yuanjian Li
Hi Tyson,

Thanks for the reporting!
I reproduced this locally based on your code with some changes, which only keep the wrong answer job. The code as below:
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).cache.repartition(239).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f -n java".!!)
}
x
}
val r2 = df.distinct.count()
I think the reason for the wrong answer is, in the CachedRDDBuilder, we miss tracking the `isOrderSensitive` characteristic to the newly created MapPartitionsRDD. Jira created in:https://issues.apache.org/jira/browse/SPARK-28699.
The fix will base on Wenchen's work SPARK-23243. Currently, we make the job fail when we find an indeterminate stage retry. Feel free to have a review.

The support of Spark rerun the indeterminate stage will be done after SPARK-25341. If you need the indeterminate stage after cache operation right now, you can test on this branch.

Best,
Yuanjian

Wenchen Fan <[hidden email]> 于2019年8月12日周一 下午8:19写道:
Hi Tyson,

Thanks for reporting it! I quickly checked the related scheduler code but can't find an obvious place that can go wrong with cached RDD.

Sean said that he can't produce it, but the second job fails. This is actually expected. We need a lot more changes to completely fix this problem, so currently the fix is to fail the job if the scheduler needs to retry an indeterminate shuffle map stage.

It would be great to know if we can reproduce this bug with the master branch.

Thanks,
Wenchen

On Sun, Aug 11, 2019 at 7:22 AM Xiao Li <[hidden email]> wrote:
Hi, Tyson, 

Could you open a new JIRA with correctness label? SPARK-23207 might not cover all the scenarios, especially when you using cache. 

Cheers,

Xiao 

On Fri, Aug 9, 2019 at 9:26 AM <[hidden email]> wrote:
Hi Sean,

To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a large number e.g., 100; a suggestion from Jiang Xingbo.

I haven't seen any recent movement/PRs on this issue, but I'll see if we can repro with a more recent version of Spark.

Best regards,
Tyson

-----Original Message-----
From: Sean Owen <[hidden email]>
Sent: Friday, August 9, 2019 7:49 AM
To: [hidden email]
Cc: dev <[hidden email]>
Subject: Re: [SPARK-23207] Repro

Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM <[hidden email]> wrote:
>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 &&
> TaskContext.get.partitionId < 1) {
>
>     throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 100000000
>
> The second df.distinct.count incorrect produces 99999769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Databricks Summit - Watch the talks