[SQL][ML] Pipeline performance regression between 1.6 and 2.x

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[SQL][ML] Pipeline performance regression between 1.6 and 2.x

zero323
Hi everyone,

While experimenting with ML pipelines I experience a significant
performance regression when switching from 1.6.x to 2.x.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val assembler = new
VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler

new Pipeline().setStages(stages).fit(df).transform(df).show

Task execution time is comparable and executors are most of the time
idle so it looks like it is a problem with the optimizer. Is it a known
issue? Are there any changes I've missed, that could lead to this behavior?

--
Best,
Maciej


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Hi Maciej,

Basically the fitting algorithm in Pipeline is an iterative operation. Running iterative algorithm on Dataset would have RDD lineages and query plans that grow fast. Without cache and checkpoint, it gets slower when the iteration number increases.

I think it is why when you run a Pipeline with long stages, it gets much longer time to finish. As I think it is not uncommon to have long stages in a Pipeline, we should improve this. I will submit a PR for this.

zero323 wrote
Hi everyone,

While experimenting with ML pipelines I experience a significant
performance regression when switching from 1.6.x to 2.x.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val assembler = new
VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler

new Pipeline().setStages(stages).fit(df).transform(df).show

Task execution time is comparable and executors are most of the time
idle so it looks like it is a problem with the optimizer. Is it a known
issue? Are there any changes I've missed, that could lead to this behavior?

--
Best,
Maciej


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Hi Maciej,

FYI, the PR is at https://github.com/apache/spark/pull/16775.

Liang-Chi Hsieh wrote
Hi Maciej,

Basically the fitting algorithm in Pipeline is an iterative operation. Running iterative algorithm on Dataset would have RDD lineages and query plans that grow fast. Without cache and checkpoint, it gets slower when the iteration number increases.

I think it is why when you run a Pipeline with long stages, it gets much longer time to finish. As I think it is not uncommon to have long stages in a Pipeline, we should improve this. I will submit a PR for this.

zero323 wrote
Hi everyone,

While experimenting with ML pipelines I experience a significant
performance regression when switching from 1.6.x to 2.x.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val assembler = new
VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler

new Pipeline().setStages(stages).fit(df).transform(df).show

Task execution time is comparable and executors are most of the time
idle so it looks like it is a problem with the optimizer. Is it a known
issue? Are there any changes I've missed, that could lead to this behavior?

--
Best,
Maciej


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Nick Pentreath
Hi Maciej

If you're seeing a regression from 1.6 -> 2.0 both using DataFrames then that seems to point to some other underlying issue as the root cause.

Even though adding checkpointing should help, we should understand why it's different between 1.6 and 2.0?


On Thu, 2 Feb 2017 at 08:22 Liang-Chi Hsieh <[hidden email]> wrote:

Hi Maciej,

FYI, the PR is at https://github.com/apache/spark/pull/16775.


Liang-Chi Hsieh wrote
> Hi Maciej,
>
> Basically the fitting algorithm in Pipeline is an iterative operation.
> Running iterative algorithm on Dataset would have RDD lineages and query
> plans that grow fast. Without cache and checkpoint, it gets slower when
> the iteration number increases.
>
> I think it is why when you run a Pipeline with long stages, it gets much
> longer time to finish. As I think it is not uncommon to have long stages
> in a Pipeline, we should improve this. I will submit a PR for this.
> zero323 wrote
>> Hi everyone,
>>
>> While experimenting with ML pipelines I experience a significant
>> performance regression when switching from 1.6.x to 2.x.
>>
>> import org.apache.spark.ml.{Pipeline, PipelineStage}
>> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
>> VectorAssembler}
>>
>> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
>> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>> val indexers = df.columns.tail.map(c => new StringIndexer()
>>   .setInputCol(c)
>>   .setOutputCol(s"${c}_indexed")
>>   .setHandleInvalid("skip"))
>>
>> val encoders = indexers.map(indexer => new OneHotEncoder()
>>   .setInputCol(indexer.getOutputCol)
>>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
>>   .setDropLast(true))
>>
>> val assembler = new
>> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
>> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
>>
>> new Pipeline().setStages(stages).fit(df).transform(df).show
>>
>> Task execution time is comparable and executors are most of the time
>> idle so it looks like it is a problem with the optimizer. Is it a known
>> issue? Are there any changes I've missed, that could lead to this
>> behavior?
>>
>> --
>> Best,
>> Maciej
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail:

>> dev-unsubscribe@.apache





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Thanks Nick for pointing it out. I totally agreed.

In 1.6 codebase, actually Pipeline uses DataFrame instead of Dataset, because they are not merged yet in 1.6.

In StringIndexer and OneHotEncoder, they have called ".rdd" on the Dataset, this would deserialize the rows.

In 1.6, as they use DataFrame, there is no extra cost for deserialization.

I think this would cause some regression. As Maciej didn't show how much performance regression observed, I can't judge if this is the root cause for it. But this is the initial idea after I check 1.6 and current Pipeline.


Nick Pentreath wrote
Hi Maciej

If you're seeing a regression from 1.6 -> 2.0 *both using DataFrames *then
that seems to point to some other underlying issue as the root cause.

Even though adding checkpointing should help, we should understand why it's
different between 1.6 and 2.0?


On Thu, 2 Feb 2017 at 08:22 Liang-Chi Hsieh <[hidden email]> wrote:

>
> Hi Maciej,
>
> FYI, the PR is at https://github.com/apache/spark/pull/16775.
>
>
> Liang-Chi Hsieh wrote
> > Hi Maciej,
> >
> > Basically the fitting algorithm in Pipeline is an iterative operation.
> > Running iterative algorithm on Dataset would have RDD lineages and query
> > plans that grow fast. Without cache and checkpoint, it gets slower when
> > the iteration number increases.
> >
> > I think it is why when you run a Pipeline with long stages, it gets much
> > longer time to finish. As I think it is not uncommon to have long stages
> > in a Pipeline, we should improve this. I will submit a PR for this.
> > zero323 wrote
> >> Hi everyone,
> >>
> >> While experimenting with ML pipelines I experience a significant
> >> performance regression when switching from 1.6.x to 2.x.
> >>
> >> import org.apache.spark.ml.{Pipeline, PipelineStage}
> >> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
> >> VectorAssembler}
> >>
> >> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
> >> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
> >> val indexers = df.columns.tail.map(c => new StringIndexer()
> >>   .setInputCol(c)
> >>   .setOutputCol(s"${c}_indexed")
> >>   .setHandleInvalid("skip"))
> >>
> >> val encoders = indexers.map(indexer => new OneHotEncoder()
> >>   .setInputCol(indexer.getOutputCol)
> >>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
> >>   .setDropLast(true))
> >>
> >> val assembler = new
> >> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
> >> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
> >>
> >> new Pipeline().setStages(stages).fit(df).transform(df).show
> >>
> >> Task execution time is comparable and executors are most of the time
> >> idle so it looks like it is a problem with the optimizer. Is it a known
> >> issue? Are there any changes I've missed, that could lead to this
> >> behavior?
> >>
> >> --
> >> Best,
> >> Maciej
> >>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail:
>
> >> dev-unsubscribe@.apache
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

zero323
In reply to this post by Liang-Chi Hsieh

Hi Liang-Chi,

Thank you for your answer and PR but what I think I wasn't specific enough. In hindsight I should have illustrate this better. What really troubles me here is a pattern of growing delays. Difference between 1.6.3 (roughly 20s runtime since the first job):


1.6 timeline

vs 2.1.0 (45 minutes or so in a bad case):

2.1.0 timeline

The code is just an example and it is intentionally dumb. You easily mask this with caching, or using significantly larger data sets. So I guess the question I am really interested in is - what changed between 1.6.3 and 2.x (this is more or less consistent across 2.0, 2.1 and current master) to cause this and more important, is it a feature or is it a bug? I admit, I choose a lazy path here, and didn't spend much time (yet) trying to dig deeper.

I can see a bit higher memory usage, a bit more intensive GC activity, but nothing I would really blame for this behavior, and duration of individual jobs is comparable with some favor of 2.x. Neither StringIndexer nor OneHotEncoder changed much in 2.x. They used RDDs for fitting in 1.6 and, as far as I can tell, they still do that in 2.x. And the problem doesn't look that related to the data processing part in the first place.


On 02/02/2017 07:22 AM, Liang-Chi Hsieh wrote:
Hi Maciej,

FYI, the PR is at https://github.com/apache/spark/pull/16775.


Liang-Chi Hsieh wrote
Hi Maciej,

Basically the fitting algorithm in Pipeline is an iterative operation.
Running iterative algorithm on Dataset would have RDD lineages and query
plans that grow fast. Without cache and checkpoint, it gets slower when
the iteration number increases.

I think it is why when you run a Pipeline with long stages, it gets much
longer time to finish. As I think it is not uncommon to have long stages
in a Pipeline, we should improve this. I will submit a PR for this.
zero323 wrote
Hi everyone,

While experimenting with ML pipelines I experience a significant
performance regression when switching from 1.6.x to 2.x.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val assembler = new
VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler

new Pipeline().setStages(stages).fit(df).transform(df).show

Task execution time is comparable and executors are most of the time
idle so it looks like it is a problem with the optimizer. Is it a known
issue? Are there any changes I've missed, that could lead to this
behavior?

-- 
Best,
Maciej


---------------------------------------------------------------------
To unsubscribe e-mail: 

      
[hidden email]




-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


-- 
Maciej Szymkiewicz
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Hi Maciej,

Thanks for the info you provided.

I tried to run the same example with 1.6 and current branch and record the difference between the time cost on preparing the executed plan.

Current branch:

292 ms                                                                            
95 ms                            
57 ms
34 ms
128 ms
120 ms
63 ms
106 ms
179 ms
159 ms
235 ms
260 ms
334 ms
464 ms
547 ms                            
719 ms
942 ms
1130 ms
1928 ms
1751 ms
2159 ms                            
2767 ms
3333 ms
4175 ms
5106 ms
6269 ms
7683 ms
9210 ms
10931 ms
13237 ms
15651 ms
19222 ms
23841 ms
26135 ms
31299 ms
38437 ms
47392 ms
51420 ms
60285 ms
69840 ms
74294 ms

1.6:

3 ms
4 ms
10 ms
4 ms
17 ms
8 ms
12 ms
21 ms
15 ms
15 ms
19 ms
23 ms
28 ms
28 ms
58 ms
39 ms
43 ms
61 ms
56 ms
60 ms
81 ms
73 ms
100 ms
91 ms
96 ms
116 ms
111 ms
140 ms
127 ms
142 ms
148 ms
165 ms
171 ms
198 ms
200 ms
233 ms
237 ms
253 ms
256 ms
271 ms
292 ms
452 ms

Although they both take more time after each iteration due to the grown query plan, it is obvious that current branch takes much more time than 1.6 branch. The optimizer and query planning in current branch is much more complicated than 1.6.

zero323 wrote
Hi Liang-Chi,

Thank you for your answer and PR but what I think I wasn't specific
enough. In hindsight I should have illustrate this better. What really
troubles me here is a pattern of growing delays. Difference between
1.6.3 (roughly 20s runtime since the first job):


1.6 timeline

vs 2.1.0 (45 minutes or so in a bad case):

2.1.0 timeline

The code is just an example and it is intentionally dumb. You easily
mask this with caching, or using significantly larger data sets. So I
guess the question I am really interested in is - what changed between
1.6.3 and 2.x (this is more or less consistent across 2.0, 2.1 and
current master) to cause this and more important, is it a feature or is
it a bug? I admit, I choose a lazy path here, and didn't spend much time
(yet) trying to dig deeper.

I can see a bit higher memory usage, a bit more intensive GC activity,
but nothing I would really blame for this behavior, and duration of
individual jobs is comparable with some favor of 2.x. Neither
StringIndexer nor OneHotEncoder changed much in 2.x. They used RDDs for
fitting in 1.6 and, as far as I can tell, they still do that in 2.x. And
the problem doesn't look that related to the data processing part in the
first place.


On 02/02/2017 07:22 AM, Liang-Chi Hsieh wrote:
> Hi Maciej,
>
> FYI, the PR is at https://github.com/apache/spark/pull/16775.
>
>
> Liang-Chi Hsieh wrote
>> Hi Maciej,
>>
>> Basically the fitting algorithm in Pipeline is an iterative operation.
>> Running iterative algorithm on Dataset would have RDD lineages and query
>> plans that grow fast. Without cache and checkpoint, it gets slower when
>> the iteration number increases.
>>
>> I think it is why when you run a Pipeline with long stages, it gets much
>> longer time to finish. As I think it is not uncommon to have long stages
>> in a Pipeline, we should improve this. I will submit a PR for this.
>> zero323 wrote
>>> Hi everyone,
>>>
>>> While experimenting with ML pipelines I experience a significant
>>> performance regression when switching from 1.6.x to 2.x.
>>>
>>> import org.apache.spark.ml.{Pipeline, PipelineStage}
>>> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
>>> VectorAssembler}
>>>
>>> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
>>> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>>> val indexers = df.columns.tail.map(c => new StringIndexer()
>>>   .setInputCol(c)
>>>   .setOutputCol(s"${c}_indexed")
>>>   .setHandleInvalid("skip"))
>>>
>>> val encoders = indexers.map(indexer => new OneHotEncoder()
>>>   .setInputCol(indexer.getOutputCol)
>>>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
>>>   .setDropLast(true))
>>>
>>> val assembler = new
>>> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
>>> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
>>>
>>> new Pipeline().setStages(stages).fit(df).transform(df).show
>>>
>>> Task execution time is comparable and executors are most of the time
>>> idle so it looks like it is a problem with the optimizer. Is it a known
>>> issue? Are there any changes I've missed, that could lead to this
>>> behavior?
>>>
>>> --
>>> Best,
>>> Maciej
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail:
>>> dev-unsubscribe@.apache
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/ 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

--
Maciej Szymkiewicz



nM15AWH.png (19K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/0/nM15AWH.png>
KHZa7hL.png (26K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/1/KHZa7hL.png>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Hi Maciej,

After looking into the details of the time spent on preparing the executed plan, the cause of the significant difference between 1.6 and current codebase when running the example, is the optimization process to generate constraints.

There seems few operations in generating constraints which are not optimized. Plus the fact the query plan grows continuously, the time spent on generating constraints increases more and more.

I am trying to reduce the time cost. Although not as low as 1.6 because we can't remove the process of generating constraints, it is significantly lower than current codebase (74294 ms -> 2573 ms).

385 ms
107 ms
46 ms
58 ms
64 ms
105 ms
86 ms
122 ms
115 ms
114 ms
100 ms
109 ms
169 ms
196 ms
174 ms
212 ms
290 ms
254 ms
318 ms
405 ms
347 ms
443 ms
432 ms
500 ms
544 ms
619 ms
697 ms
683 ms
807 ms
802 ms
960 ms
1010 ms
1155 ms
1251 ms
1298 ms
1388 ms
1503 ms
1613 ms
2279 ms
2349 ms
2573 ms


Liang-Chi Hsieh wrote
Hi Maciej,

Thanks for the info you provided.

I tried to run the same example with 1.6 and current branch and record the difference between the time cost on preparing the executed plan.

Current branch:

292 ms                                                                            
95 ms                            
57 ms
34 ms
128 ms
120 ms
63 ms
106 ms
179 ms
159 ms
235 ms
260 ms
334 ms
464 ms
547 ms                            
719 ms
942 ms
1130 ms
1928 ms
1751 ms
2159 ms                            
2767 ms
3333 ms
4175 ms
5106 ms
6269 ms
7683 ms
9210 ms
10931 ms
13237 ms
15651 ms
19222 ms
23841 ms
26135 ms
31299 ms
38437 ms
47392 ms
51420 ms
60285 ms
69840 ms
74294 ms

1.6:

3 ms
4 ms
10 ms
4 ms
17 ms
8 ms
12 ms
21 ms
15 ms
15 ms
19 ms
23 ms
28 ms
28 ms
58 ms
39 ms
43 ms
61 ms
56 ms
60 ms
81 ms
73 ms
100 ms
91 ms
96 ms
116 ms
111 ms
140 ms
127 ms
142 ms
148 ms
165 ms
171 ms
198 ms
200 ms
233 ms
237 ms
253 ms
256 ms
271 ms
292 ms
452 ms

Although they both take more time after each iteration due to the grown query plan, it is obvious that current branch takes much more time than 1.6 branch. The optimizer and query planning in current branch is much more complicated than 1.6.

zero323 wrote
Hi Liang-Chi,

Thank you for your answer and PR but what I think I wasn't specific
enough. In hindsight I should have illustrate this better. What really
troubles me here is a pattern of growing delays. Difference between
1.6.3 (roughly 20s runtime since the first job):


1.6 timeline

vs 2.1.0 (45 minutes or so in a bad case):

2.1.0 timeline

The code is just an example and it is intentionally dumb. You easily
mask this with caching, or using significantly larger data sets. So I
guess the question I am really interested in is - what changed between
1.6.3 and 2.x (this is more or less consistent across 2.0, 2.1 and
current master) to cause this and more important, is it a feature or is
it a bug? I admit, I choose a lazy path here, and didn't spend much time
(yet) trying to dig deeper.

I can see a bit higher memory usage, a bit more intensive GC activity,
but nothing I would really blame for this behavior, and duration of
individual jobs is comparable with some favor of 2.x. Neither
StringIndexer nor OneHotEncoder changed much in 2.x. They used RDDs for
fitting in 1.6 and, as far as I can tell, they still do that in 2.x. And
the problem doesn't look that related to the data processing part in the
first place.


On 02/02/2017 07:22 AM, Liang-Chi Hsieh wrote:
> Hi Maciej,
>
> FYI, the PR is at https://github.com/apache/spark/pull/16775.
>
>
> Liang-Chi Hsieh wrote
>> Hi Maciej,
>>
>> Basically the fitting algorithm in Pipeline is an iterative operation.
>> Running iterative algorithm on Dataset would have RDD lineages and query
>> plans that grow fast. Without cache and checkpoint, it gets slower when
>> the iteration number increases.
>>
>> I think it is why when you run a Pipeline with long stages, it gets much
>> longer time to finish. As I think it is not uncommon to have long stages
>> in a Pipeline, we should improve this. I will submit a PR for this.
>> zero323 wrote
>>> Hi everyone,
>>>
>>> While experimenting with ML pipelines I experience a significant
>>> performance regression when switching from 1.6.x to 2.x.
>>>
>>> import org.apache.spark.ml.{Pipeline, PipelineStage}
>>> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
>>> VectorAssembler}
>>>
>>> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
>>> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>>> val indexers = df.columns.tail.map(c => new StringIndexer()
>>>   .setInputCol(c)
>>>   .setOutputCol(s"${c}_indexed")
>>>   .setHandleInvalid("skip"))
>>>
>>> val encoders = indexers.map(indexer => new OneHotEncoder()
>>>   .setInputCol(indexer.getOutputCol)
>>>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
>>>   .setDropLast(true))
>>>
>>> val assembler = new
>>> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
>>> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
>>>
>>> new Pipeline().setStages(stages).fit(df).transform(df).show
>>>
>>> Task execution time is comparable and executors are most of the time
>>> idle so it looks like it is a problem with the optimizer. Is it a known
>>> issue? Are there any changes I've missed, that could lead to this
>>> behavior?
>>>
>>> --
>>> Best,
>>> Maciej
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail:
>>> dev-unsubscribe@.apache
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/ 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

--
Maciej Szymkiewicz



nM15AWH.png (19K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/0/nM15AWH.png>
KHZa7hL.png (26K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/1/KHZa7hL.png>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

Liang-Chi Hsieh

Hi Maciej,

FYI, this fix is submitted at https://github.com/apache/spark/pull/16785.

Liang-Chi Hsieh wrote
Hi Maciej,

After looking into the details of the time spent on preparing the executed plan, the cause of the significant difference between 1.6 and current codebase when running the example, is the optimization process to generate constraints.

There seems few operations in generating constraints which are not optimized. Plus the fact the query plan grows continuously, the time spent on generating constraints increases more and more.

I am trying to reduce the time cost. Although not as low as 1.6 because we can't remove the process of generating constraints, it is significantly lower than current codebase (74294 ms -> 2573 ms).

385 ms
107 ms
46 ms
58 ms
64 ms
105 ms
86 ms
122 ms
115 ms
114 ms
100 ms
109 ms
169 ms
196 ms
174 ms
212 ms
290 ms
254 ms
318 ms
405 ms
347 ms
443 ms
432 ms
500 ms
544 ms
619 ms
697 ms
683 ms
807 ms
802 ms
960 ms
1010 ms
1155 ms
1251 ms
1298 ms
1388 ms
1503 ms
1613 ms
2279 ms
2349 ms
2573 ms


Liang-Chi Hsieh wrote
Hi Maciej,

Thanks for the info you provided.

I tried to run the same example with 1.6 and current branch and record the difference between the time cost on preparing the executed plan.

Current branch:

292 ms                                                                            
95 ms                            
57 ms
34 ms
128 ms
120 ms
63 ms
106 ms
179 ms
159 ms
235 ms
260 ms
334 ms
464 ms
547 ms                            
719 ms
942 ms
1130 ms
1928 ms
1751 ms
2159 ms                            
2767 ms
3333 ms
4175 ms
5106 ms
6269 ms
7683 ms
9210 ms
10931 ms
13237 ms
15651 ms
19222 ms
23841 ms
26135 ms
31299 ms
38437 ms
47392 ms
51420 ms
60285 ms
69840 ms
74294 ms

1.6:

3 ms
4 ms
10 ms
4 ms
17 ms
8 ms
12 ms
21 ms
15 ms
15 ms
19 ms
23 ms
28 ms
28 ms
58 ms
39 ms
43 ms
61 ms
56 ms
60 ms
81 ms
73 ms
100 ms
91 ms
96 ms
116 ms
111 ms
140 ms
127 ms
142 ms
148 ms
165 ms
171 ms
198 ms
200 ms
233 ms
237 ms
253 ms
256 ms
271 ms
292 ms
452 ms

Although they both take more time after each iteration due to the grown query plan, it is obvious that current branch takes much more time than 1.6 branch. The optimizer and query planning in current branch is much more complicated than 1.6.

zero323 wrote
Hi Liang-Chi,

Thank you for your answer and PR but what I think I wasn't specific
enough. In hindsight I should have illustrate this better. What really
troubles me here is a pattern of growing delays. Difference between
1.6.3 (roughly 20s runtime since the first job):


1.6 timeline

vs 2.1.0 (45 minutes or so in a bad case):

2.1.0 timeline

The code is just an example and it is intentionally dumb. You easily
mask this with caching, or using significantly larger data sets. So I
guess the question I am really interested in is - what changed between
1.6.3 and 2.x (this is more or less consistent across 2.0, 2.1 and
current master) to cause this and more important, is it a feature or is
it a bug? I admit, I choose a lazy path here, and didn't spend much time
(yet) trying to dig deeper.

I can see a bit higher memory usage, a bit more intensive GC activity,
but nothing I would really blame for this behavior, and duration of
individual jobs is comparable with some favor of 2.x. Neither
StringIndexer nor OneHotEncoder changed much in 2.x. They used RDDs for
fitting in 1.6 and, as far as I can tell, they still do that in 2.x. And
the problem doesn't look that related to the data processing part in the
first place.


On 02/02/2017 07:22 AM, Liang-Chi Hsieh wrote:
> Hi Maciej,
>
> FYI, the PR is at https://github.com/apache/spark/pull/16775.
>
>
> Liang-Chi Hsieh wrote
>> Hi Maciej,
>>
>> Basically the fitting algorithm in Pipeline is an iterative operation.
>> Running iterative algorithm on Dataset would have RDD lineages and query
>> plans that grow fast. Without cache and checkpoint, it gets slower when
>> the iteration number increases.
>>
>> I think it is why when you run a Pipeline with long stages, it gets much
>> longer time to finish. As I think it is not uncommon to have long stages
>> in a Pipeline, we should improve this. I will submit a PR for this.
>> zero323 wrote
>>> Hi everyone,
>>>
>>> While experimenting with ML pipelines I experience a significant
>>> performance regression when switching from 1.6.x to 2.x.
>>>
>>> import org.apache.spark.ml.{Pipeline, PipelineStage}
>>> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
>>> VectorAssembler}
>>>
>>> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
>>> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>>> val indexers = df.columns.tail.map(c => new StringIndexer()
>>>   .setInputCol(c)
>>>   .setOutputCol(s"${c}_indexed")
>>>   .setHandleInvalid("skip"))
>>>
>>> val encoders = indexers.map(indexer => new OneHotEncoder()
>>>   .setInputCol(indexer.getOutputCol)
>>>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
>>>   .setDropLast(true))
>>>
>>> val assembler = new
>>> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
>>> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
>>>
>>> new Pipeline().setStages(stages).fit(df).transform(df).show
>>>
>>> Task execution time is comparable and executors are most of the time
>>> idle so it looks like it is a problem with the optimizer. Is it a known
>>> issue? Are there any changes I've missed, that could lead to this
>>> behavior?
>>>
>>> --
>>> Best,
>>> Maciej
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail:
>>> dev-unsubscribe@.apache
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/ 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

--
Maciej Szymkiewicz



nM15AWH.png (19K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/0/nM15AWH.png>
KHZa7hL.png (26K) <http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/1/KHZa7hL.png>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SQL][ML] Pipeline performance regression between 1.6 and 2.x

zero323
Hi Liang-Chi,

Thank you for the updates. This looks promising.


On 02/03/2017 08:34 AM, Liang-Chi Hsieh wrote:

> Hi Maciej,
>
> FYI, this fix is submitted at https://github.com/apache/spark/pull/16785.
>
>
> Liang-Chi Hsieh wrote
>> Hi Maciej,
>>
>> After looking into the details of the time spent on preparing the executed
>> plan, the cause of the significant difference between 1.6 and current
>> codebase when running the example, is the optimization process to generate
>> constraints.
>>
>> There seems few operations in generating constraints which are not
>> optimized. Plus the fact the query plan grows continuously, the time spent
>> on generating constraints increases more and more.
>>
>> I am trying to reduce the time cost. Although not as low as 1.6 because we
>> can't remove the process of generating constraints, it is significantly
>> lower than current codebase (74294 ms -> 2573 ms).
>>
>> 385 ms
>> 107 ms
>> 46 ms
>> 58 ms
>> 64 ms
>> 105 ms
>> 86 ms
>> 122 ms
>> 115 ms
>> 114 ms
>> 100 ms
>> 109 ms
>> 169 ms
>> 196 ms
>> 174 ms
>> 212 ms
>> 290 ms
>> 254 ms
>> 318 ms
>> 405 ms
>> 347 ms
>> 443 ms
>> 432 ms
>> 500 ms
>> 544 ms
>> 619 ms
>> 697 ms
>> 683 ms
>> 807 ms
>> 802 ms
>> 960 ms
>> 1010 ms
>> 1155 ms
>> 1251 ms
>> 1298 ms
>> 1388 ms
>> 1503 ms
>> 1613 ms
>> 2279 ms
>> 2349 ms
>> 2573 ms
>>
>> Liang-Chi Hsieh wrote
>>> Hi Maciej,
>>>
>>> Thanks for the info you provided.
>>>
>>> I tried to run the same example with 1.6 and current branch and record
>>> the difference between the time cost on preparing the executed plan.
>>>
>>> Current branch:
>>>
>>> 292 ms                                                                            
>>> 95 ms                            
>>> 57 ms
>>> 34 ms
>>> 128 ms
>>> 120 ms
>>> 63 ms
>>> 106 ms
>>> 179 ms
>>> 159 ms
>>> 235 ms
>>> 260 ms
>>> 334 ms
>>> 464 ms
>>> 547 ms                            
>>> 719 ms
>>> 942 ms
>>> 1130 ms
>>> 1928 ms
>>> 1751 ms
>>> 2159 ms                            
>>> 2767 ms
>>> 3333 ms
>>> 4175 ms
>>> 5106 ms
>>> 6269 ms
>>> 7683 ms
>>> 9210 ms
>>> 10931 ms
>>> 13237 ms
>>> 15651 ms
>>> 19222 ms
>>> 23841 ms
>>> 26135 ms
>>> 31299 ms
>>> 38437 ms
>>> 47392 ms
>>> 51420 ms
>>> 60285 ms
>>> 69840 ms
>>> 74294 ms
>>>
>>> 1.6:
>>>
>>> 3 ms
>>> 4 ms
>>> 10 ms
>>> 4 ms
>>> 17 ms
>>> 8 ms
>>> 12 ms
>>> 21 ms
>>> 15 ms
>>> 15 ms
>>> 19 ms
>>> 23 ms
>>> 28 ms
>>> 28 ms
>>> 58 ms
>>> 39 ms
>>> 43 ms
>>> 61 ms
>>> 56 ms
>>> 60 ms
>>> 81 ms
>>> 73 ms
>>> 100 ms
>>> 91 ms
>>> 96 ms
>>> 116 ms
>>> 111 ms
>>> 140 ms
>>> 127 ms
>>> 142 ms
>>> 148 ms
>>> 165 ms
>>> 171 ms
>>> 198 ms
>>> 200 ms
>>> 233 ms
>>> 237 ms
>>> 253 ms
>>> 256 ms
>>> 271 ms
>>> 292 ms
>>> 452 ms
>>>
>>> Although they both take more time after each iteration due to the grown
>>> query plan, it is obvious that current branch takes much more time than
>>> 1.6 branch. The optimizer and query planning in current branch is much
>>> more complicated than 1.6.
>>> zero323 wrote
>>>> Hi Liang-Chi,
>>>>
>>>> Thank you for your answer and PR but what I think I wasn't specific
>>>> enough. In hindsight I should have illustrate this better. What really
>>>> troubles me here is a pattern of growing delays. Difference between
>>>> 1.6.3 (roughly 20s runtime since the first job):
>>>>
>>>>
>>>> 1.6 timeline
>>>>
>>>> vs 2.1.0 (45 minutes or so in a bad case):
>>>>
>>>> 2.1.0 timeline
>>>>
>>>> The code is just an example and it is intentionally dumb. You easily
>>>> mask this with caching, or using significantly larger data sets. So I
>>>> guess the question I am really interested in is - what changed between
>>>> 1.6.3 and 2.x (this is more or less consistent across 2.0, 2.1 and
>>>> current master) to cause this and more important, is it a feature or is
>>>> it a bug? I admit, I choose a lazy path here, and didn't spend much time
>>>> (yet) trying to dig deeper.
>>>>
>>>> I can see a bit higher memory usage, a bit more intensive GC activity,
>>>> but nothing I would really blame for this behavior, and duration of
>>>> individual jobs is comparable with some favor of 2.x. Neither
>>>> StringIndexer nor OneHotEncoder changed much in 2.x. They used RDDs for
>>>> fitting in 1.6 and, as far as I can tell, they still do that in 2.x. And
>>>> the problem doesn't look that related to the data processing part in the
>>>> first place.
>>>>
>>>>
>>>> On 02/02/2017 07:22 AM, Liang-Chi Hsieh wrote:
>>>>> Hi Maciej,
>>>>>
>>>>> FYI, the PR is at https://github.com/apache/spark/pull/16775.
>>>>>
>>>>>
>>>>> Liang-Chi Hsieh wrote
>>>>>> Hi Maciej,
>>>>>>
>>>>>> Basically the fitting algorithm in Pipeline is an iterative operation.
>>>>>> Running iterative algorithm on Dataset would have RDD lineages and
>>>>>> query
>>>>>> plans that grow fast. Without cache and checkpoint, it gets slower
>>>>>> when
>>>>>> the iteration number increases.
>>>>>>
>>>>>> I think it is why when you run a Pipeline with long stages, it gets
>>>>>> much
>>>>>> longer time to finish. As I think it is not uncommon to have long
>>>>>> stages
>>>>>> in a Pipeline, we should improve this. I will submit a PR for this.
>>>>>> zero323 wrote
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> While experimenting with ML pipelines I experience a significant
>>>>>>> performance regression when switching from 1.6.x to 2.x.
>>>>>>>
>>>>>>> import org.apache.spark.ml.{Pipeline, PipelineStage}
>>>>>>> import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
>>>>>>> VectorAssembler}
>>>>>>>
>>>>>>> val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
>>>>>>> "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>>>>>>> val indexers = df.columns.tail.map(c => new StringIndexer()
>>>>>>>   .setInputCol(c)
>>>>>>>   .setOutputCol(s"${c}_indexed")
>>>>>>>   .setHandleInvalid("skip"))
>>>>>>>
>>>>>>> val encoders = indexers.map(indexer => new OneHotEncoder()
>>>>>>>   .setInputCol(indexer.getOutputCol)
>>>>>>>   .setOutputCol(s"${indexer.getOutputCol}_encoded")
>>>>>>>   .setDropLast(true))
>>>>>>>
>>>>>>> val assembler = new
>>>>>>> VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
>>>>>>> val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler
>>>>>>>
>>>>>>> new Pipeline().setStages(stages).fit(df).transform(df).show
>>>>>>>
>>>>>>> Task execution time is comparable and executors are most of the time
>>>>>>> idle so it looks like it is a problem with the optimizer. Is it a
>>>>>>> known
>>>>>>> issue? Are there any changes I've missed, that could lead to this
>>>>>>> behavior?
>>>>>>>
>>>>>>> --
>>>>>>> Best,
>>>>>>> Maciej
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail:
>>>>>>> dev-unsubscribe@.apache
>>>>>
>>>>>
>>>>>
>>>>> -----
>>>>> Liang-Chi Hsieh | @viirya
>>>>> Spark Technology Center
>>>>> http://www.spark.tc/ 
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20822.html
>>>>> Sent from the Apache Spark Developers List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail:
>>>> dev-unsubscribe@.apache
>>>> --
>>>> Maciej Szymkiewicz
>>>>
>>>>
>>>>
>>>> nM15AWH.png (19K)
>>>> &lt;http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/0/nM15AWH.png&gt;
>>>> KHZa7hL.png (26K)
>>>> &lt;http://apache-spark-developers-list.1001551.n3.nabble.com/attachment/20827/1/KHZa7hL.png&gt;
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/ 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tp20803p20837.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Loading...