Coalesce behaviour

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Coalesce behaviour

Sergey Zhemzhitsky
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Koert Kuipers
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Sergey Zhemzhitsky
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

cloud0fan
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Sergey Zhemzhitsky
I'd like to reduce the number of files written to hdfs without
introducing additional shuffles and at the same time to preserve the
stability of the job, and also I'd like to understand why the samples
below work in one case and fail in another one.

Consider the following example which does the same thing using the
same resources, but fails in one case and works without issues in
another one if there is an additional shuffle introduced:

spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn-client \
  --conf spark.executor.memory=4g \
  --conf spark.executor.memoryOverhead=1024 \
  --conf spark.dynamicAllocation.enabled=false

import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new
Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new
Text(RandomStringUtils.randomAlphanumeric(1024)))
  )
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])

// count unique keys
rdd.keys.map(_.toString).distinct.count
// in my case it's equal to 46656

// fails with either OOM or 'Container killed by YARN for exceeding
memory limits ... spark.yarn.executor.memoryOverhead'
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

// works as expected
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count
On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan <[hidden email]> wrote:

>
> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>
> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.
>
> Looking at your example, I don't see where is the problem. Can you describe what is not expected?
>
> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
>>
>> Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.
>>
>> I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).
>>
>> There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
>>
>>
>>
>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
>>>
>>> although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.
>>>
>>> your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.
>>>
>>> interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.
>>>
>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
>>>>
>>>> Hello guys,
>>>>
>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>
>>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>>> are usually repartitioned to redistribute data more evenly and to
>>>> prevent every partition from hitting 2GB limit. Then after join with a
>>>> lot of partitions.
>>>>
>>>> Then after successful join I'd like to save the resulting dataset.
>>>> But I don't need such a huge amount of files as the number of
>>>> partitions/tasks during joining. Actually I'm fine with such number of
>>>> files as the total number of executor cores allocated to the job. So
>>>> I've considered using a coalesce.
>>>>
>>>> The problem is that coalesce with shuffling disabled prevents join
>>>> from using the specified number of partitions and instead forces join
>>>> to use the number of partitions provided to coalesce
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>> false).toDebugString
>>>> res5: String =
>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>>>
>>>> With shuffling enabled everything is ok, e.g.
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
>>>> res6: String =
>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>>>
>>>> In that case the problem is that for pretty huge datasets additional
>>>> reshuffling can take hours or at least comparable amount of time as
>>>> for the join itself.
>>>>
>>>> So I'd like to understand whether it is a bug or just an expected behaviour?
>>>> In case it is expected is there any way to insert additional
>>>> ShuffleMapStage into an appropriate position of DAG but without
>>>> reshuffling itself?
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Sergey Zhemzhitsky
... sorry for that, but there is a mistake in the second sample, here
is the right one

// fails with either OOM or 'Container killed by YARN for exceeding
memory limits ... spark.yarn.executor.memoryOverhead'
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

// works as expected
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,true)
  .count
On Fri, Oct 12, 2018 at 7:20 PM Sergey Zhemzhitsky <[hidden email]> wrote:

>
> I'd like to reduce the number of files written to hdfs without
> introducing additional shuffles and at the same time to preserve the
> stability of the job, and also I'd like to understand why the samples
> below work in one case and fail in another one.
>
> Consider the following example which does the same thing using the
> same resources, but fails in one case and works without issues in
> another one if there is an additional shuffle introduced:
>
> spark-shell \
>   --num-executors=5 \
>   --executor-cores=2 \
>   --master=yarn-client \
>   --conf spark.executor.memory=4g \
>   --conf spark.executor.memoryOverhead=1024 \
>   --conf spark.dynamicAllocation.enabled=false
>
> import org.apache.hadoop.io._
> import org.apache.hadoop.io.compress._
> import org.apache.commons.lang._
> import org.apache.spark._
>
> // generate 100M records of sample data
> sc.makeRDD(1 to 1000, 1000)
>   .flatMap(item => (1 to 100000)
>     .map(i => new
> Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new
> Text(RandomStringUtils.randomAlphanumeric(1024)))
>   )
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
>
> // count unique keys
> rdd.keys.map(_.toString).distinct.count
> // in my case it's equal to 46656
>
> // fails with either OOM or 'Container killed by YARN for exceeding
> memory limits ... spark.yarn.executor.memoryOverhead'
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
>
> // works as expected
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
> On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan <[hidden email]> wrote:
> >
> > Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
> >
> > Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.
> >
> > Looking at your example, I don't see where is the problem. Can you describe what is not expected?
> >
> > On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
> >>
> >> Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.
> >>
> >> I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).
> >>
> >> There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
> >>
> >>
> >>
> >> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
> >>>
> >>> although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.
> >>>
> >>> your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.
> >>>
> >>> interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.
> >>>
> >>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
> >>>>
> >>>> Hello guys,
> >>>>
> >>>> Currently I'm a little bit confused with coalesce behaviour.
> >>>>
> >>>> Consider the following usecase - I'd like to join two pretty big RDDs.
> >>>> To make a join more stable and to prevent it from failures by OOM RDDs
> >>>> are usually repartitioned to redistribute data more evenly and to
> >>>> prevent every partition from hitting 2GB limit. Then after join with a
> >>>> lot of partitions.
> >>>>
> >>>> Then after successful join I'd like to save the resulting dataset.
> >>>> But I don't need such a huge amount of files as the number of
> >>>> partitions/tasks during joining. Actually I'm fine with such number of
> >>>> files as the total number of executor cores allocated to the job. So
> >>>> I've considered using a coalesce.
> >>>>
> >>>> The problem is that coalesce with shuffling disabled prevents join
> >>>> from using the specified number of partitions and instead forces join
> >>>> to use the number of partitions provided to coalesce
> >>>>
> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> >>>> false).toDebugString
> >>>> res5: String =
> >>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
> >>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
> >>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
> >>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
> >>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
> >>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
> >>>>
> >>>> With shuffling enabled everything is ok, e.g.
> >>>>
> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
> >>>> res6: String =
> >>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
> >>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
> >>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
> >>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
> >>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
> >>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
> >>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
> >>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
> >>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
> >>>>
> >>>> In that case the problem is that for pretty huge datasets additional
> >>>> reshuffling can take hours or at least comparable amount of time as
> >>>> for the join itself.
> >>>>
> >>>> So I'd like to understand whether it is a bug or just an expected behaviour?
> >>>> In case it is expected is there any way to insert additional
> >>>> ShuffleMapStage into an appropriate position of DAG but without
> >>>> reshuffling itself?
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe e-mail: [hidden email]
> >>>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Koert Kuipers
In reply to this post by cloud0fan
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

cloud0fan
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Sergey Zhemzhitsky
I've tried the same sample with DataFrame API and it's much more
stable although it's backed by RDD API.

This sample works without any issues and any additional Spark tuning

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
val df = rdd.map(item => item._1.toString -> item._2.toString).toDF()
df.repartition(1000,$"_1")
  .sortWithinPartitions($"_2")
  .coalesce(20)
  .count

The other thing I've noticed is that in case of RDDs the sample below

rdd.map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

... always fails on the reduce side of the shuffle.
The failures occur because of "Container killed by YARN for exceeding
memory limits. ... Consider boosting
spark.yarn.executor.memoryOverhead" errors.
In case I give containers more memory the job stabilises, but it's
just a matter of time when it fails again processing a bit increased
amount of data.

The job also stabilises if forcing spills by setting
spark.shuffle.spill.numElementsForceSpillThreshold=N property.
So I believe that the issue is connected with external sorting
(org.apache.spark.util.collection.ExternalSorter) on the reduce side
of the shuffle although there are a lot of logic to detect when the
spill should happen.
Do you think that this is due to incorrect memory estimation remaining
for the container or something else? ... because "Container killed by
YARN for exceeding memory limits" continues to happen.

On Sat, Oct 13, 2018 at 8:39 AM Wenchen Fan <[hidden email]> wrote:

>
> In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.
>
> In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.
>
> BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.
>
>
> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
>>
>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?
>>
>> every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.
>>
>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
>>>
>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>
>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.
>>>
>>> Looking at your example, I don't see where is the problem. Can you describe what is not expected?
>>>
>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
>>>>
>>>> Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.
>>>>
>>>> I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).
>>>>
>>>> There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
>>>>
>>>>
>>>>
>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
>>>>>
>>>>> although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.
>>>>>
>>>>> your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.
>>>>>
>>>>> interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.
>>>>>
>>>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
>>>>>>
>>>>>> Hello guys,
>>>>>>
>>>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>>>
>>>>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>>>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>>>>> are usually repartitioned to redistribute data more evenly and to
>>>>>> prevent every partition from hitting 2GB limit. Then after join with a
>>>>>> lot of partitions.
>>>>>>
>>>>>> Then after successful join I'd like to save the resulting dataset.
>>>>>> But I don't need such a huge amount of files as the number of
>>>>>> partitions/tasks during joining. Actually I'm fine with such number of
>>>>>> files as the total number of executor cores allocated to the job. So
>>>>>> I've considered using a coalesce.
>>>>>>
>>>>>> The problem is that coalesce with shuffling disabled prevents join
>>>>>> from using the specified number of partitions and instead forces join
>>>>>> to use the number of partitions provided to coalesce
>>>>>>
>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>>>> false).toDebugString
>>>>>> res5: String =
>>>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>>>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>>>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>>>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>>>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>>>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>>>>>
>>>>>> With shuffling enabled everything is ok, e.g.
>>>>>>
>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
>>>>>> res6: String =
>>>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>>>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>>>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>>>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>>>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>>>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>>>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>>>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>>>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>>>>>
>>>>>> In that case the problem is that for pretty huge datasets additional
>>>>>> reshuffling can take hours or at least comparable amount of time as
>>>>>> for the join itself.
>>>>>>
>>>>>> So I'd like to understand whether it is a bug or just an expected behaviour?
>>>>>> In case it is expected is there any way to insert additional
>>>>>> ShuffleMapStage into an appropriate position of DAG but without
>>>>>> reshuffling itself?
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: [hidden email]
>>>>>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Koert Kuipers
In reply to this post by cloud0fan
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program just before writing to disk to reduce the output files thinking this would solve the many files issue. to his surprise the coalesce caused the existing shuffles to run with less tasks, leading to unacceptable slowdowns and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application manager with narrow dependency) instead of modifying the existing reduce phase? i am saying map-phase because it should not introduce a new shuffle: this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <[hidden email]> wrote:
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

cloud0fan
You have a heavy workload, you want to run it with many tasks for better performance and stability(no OMM), but you also want to run it with few tasks to avoid too many small files. The reality is, mostly you can't reach these 2 goals together, they conflict with each other. The solution I can think of is to sacrifice performance a little: run the workload with many tasks at first, and then merge the many small files. Generally this is how `coalesce(n, shuffle = true)` does.

On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <[hidden email]> wrote:
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program just before writing to disk to reduce the output files thinking this would solve the many files issue. to his surprise the coalesce caused the existing shuffles to run with less tasks, leading to unacceptable slowdowns and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application manager with narrow dependency) instead of modifying the existing reduce phase? i am saying map-phase because it should not introduce a new shuffle: this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <[hidden email]> wrote:
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Koert Kuipers
sure, i understand currently the workaround is to add a shuffle. but that's just a workaround, not a satisfactory solution: we shouldn't have to introduce another shuffle (an expensive operation) just to reduce the number of files.

logically all you need is a map-phase with less tasks after the reduce phase with many tasks to reduce the number of files, but there is currently no way to express this in spark. it seems the map operation always gets tagged on to the end of the previous reduce operation, which is generally a reasonable optimization, but not here since it causes the tasks for the reduce to go down which is unacceptable.

On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <[hidden email]> wrote:
You have a heavy workload, you want to run it with many tasks for better performance and stability(no OMM), but you also want to run it with few tasks to avoid too many small files. The reality is, mostly you can't reach these 2 goals together, they conflict with each other. The solution I can think of is to sacrifice performance a little: run the workload with many tasks at first, and then merge the many small files. Generally this is how `coalesce(n, shuffle = true)` does.

On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <[hidden email]> wrote:
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program just before writing to disk to reduce the output files thinking this would solve the many files issue. to his surprise the coalesce caused the existing shuffles to run with less tasks, leading to unacceptable slowdowns and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application manager with narrow dependency) instead of modifying the existing reduce phase? i am saying map-phase because it should not introduce a new shuffle: this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <[hidden email]> wrote:
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Jörn Franke
This is not fully correct. If you have less files then you need to move some data to some other nodes, because not all the data is there for writing (even the case for the same node, but then it is easier from a network perspective). Hence a shuffling is needed.


Am 15.10.2018 um 05:04 schrieb Koert Kuipers <[hidden email]>:

sure, i understand currently the workaround is to add a shuffle. but that's just a workaround, not a satisfactory solution: we shouldn't have to introduce another shuffle (an expensive operation) just to reduce the number of files.

logically all you need is a map-phase with less tasks after the reduce phase with many tasks to reduce the number of files, but there is currently no way to express this in spark. it seems the map operation always gets tagged on to the end of the previous reduce operation, which is generally a reasonable optimization, but not here since it causes the tasks for the reduce to go down which is unacceptable.

On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <[hidden email]> wrote:
You have a heavy workload, you want to run it with many tasks for better performance and stability(no OMM), but you also want to run it with few tasks to avoid too many small files. The reality is, mostly you can't reach these 2 goals together, they conflict with each other. The solution I can think of is to sacrifice performance a little: run the workload with many tasks at first, and then merge the many small files. Generally this is how `coalesce(n, shuffle = true)` does.

On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <[hidden email]> wrote:
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program just before writing to disk to reduce the output files thinking this would solve the many files issue. to his surprise the coalesce caused the existing shuffles to run with less tasks, leading to unacceptable slowdowns and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application manager with narrow dependency) instead of modifying the existing reduce phase? i am saying map-phase because it should not introduce a new shuffle: this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <[hidden email]> wrote:
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Coalesce behaviour

Koert Kuipers
i realize it is unlikely all data will be local to tasks, so placement will not be optimal and there will be some network traffic, but is this the same as a shuffle?

in CoalesceRDD it shows a NarrowDependency, which i thought meant it could be implemented without a shuffle.

On Mon, Oct 15, 2018 at 2:49 AM Jörn Franke <[hidden email]> wrote:
This is not fully correct. If you have less files then you need to move some data to some other nodes, because not all the data is there for writing (even the case for the same node, but then it is easier from a network perspective). Hence a shuffling is needed.


Am 15.10.2018 um 05:04 schrieb Koert Kuipers <[hidden email]>:

sure, i understand currently the workaround is to add a shuffle. but that's just a workaround, not a satisfactory solution: we shouldn't have to introduce another shuffle (an expensive operation) just to reduce the number of files.

logically all you need is a map-phase with less tasks after the reduce phase with many tasks to reduce the number of files, but there is currently no way to express this in spark. it seems the map operation always gets tagged on to the end of the previous reduce operation, which is generally a reasonable optimization, but not here since it causes the tasks for the reduce to go down which is unacceptable.

On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <[hidden email]> wrote:
You have a heavy workload, you want to run it with many tasks for better performance and stability(no OMM), but you also want to run it with few tasks to avoid too many small files. The reality is, mostly you can't reach these 2 goals together, they conflict with each other. The solution I can think of is to sacrifice performance a little: run the workload with many tasks at first, and then merge the many small files. Generally this is how `coalesce(n, shuffle = true)` does.

On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <[hidden email]> wrote:
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again.

so one of our developers stuck in a .coalesce at the end of every program just before writing to disk to reduce the output files thinking this would solve the many files issue. to his surprise the coalesce caused the existing shuffles to run with less tasks, leading to unacceptable slowdowns and OOMs. so this is not a solution.

how can we insert a coalesce as a new map-phase (new job on application manager with narrow dependency) instead of modifying the existing reduce phase? i am saying map-phase because it should not introduce a new shuffle: this is wasteful and unnecessary.


On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <[hidden email]> wrote:
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 10000 reducers. since the reducers have heavy work(sorting), so you OOM. In general, your work flow is: 1000 mappers -> 20 reducers.

In your second example, the coalesce introduces shuffle, so your work flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting is done by 1000 tasks so no OOM.

BTW have you tried DataFrame API? With Spark SQL, the memory management is more precise, so even we only have 20 tasks to do the heavy sorting, the system should just have more disk spills instead of OOM.


On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <[hidden email]> wrote:
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <[hidden email]> wrote:
Note that, RDD partitions and Spark tasks are not always 1-1 mapping.

Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has 10 tasks (decided by the last RDD). This means, each Spark task will process 10 partitions of `rdd1`.

Looking at your example, I don't see where is the problem. Can you describe what is not expected?

On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced ones) with just a special kind of partitions which do not have any parent dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous ShuffleMapStage will think that the number of partitons in the next stage, it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be evaluated almost for free and empty output files from these empty partitons which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <[hidden email]> wrote:
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.

your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]