dynamic allocation manager in SS

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

dynamic allocation manager in SS

Stavros Kontopoulos-3
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Gabor Somogyi
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Stavros Kontopoulos-3
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Gabor Somogyi
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Stavros Kontopoulos-3
I am on k8s where there is no support yet afaik, there is wip wrt the shuffle service. So from your experience there are no issues with using the batch dynamic allocation version like there was before with dstreams as described in the related jira? 

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Stavros Kontopoulos-3
Btw the heuristics for batch mode (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289) vs 
streaming (https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92) are different. In batch mode you care about the numRunningOrPendingTasks while for streaming about the ratio: averageBatchProcTime.toDouble / batchDurationMs so there are some concerns beyond scaling down when idle. 
A scenario things might now work for batch dynamic allocation with SS is as follows. I start with a query that reads x kafka partitions and the data arriving is low and all tasks (1 per partition) are running since there are enough resources anyway.
At some point the data increases per partition (maxOffsetsPerTrigger is high enough) and so processing takes more time. AFAIK SS will wait for a batch to finish before running the next (waits for the trigger to finish, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46).
In this case I suspect there is no scaling up with the batch dynamic allocation mode as there are no pending tasks, only processing time changed. In this case the streaming dynamic heuristics I think are better. 
Batch mode heuristics could work, if not mistaken, if you have multiple streaming queries and there are batches waiting (using fair-scheduling etc).

PS. this has been discussed, not in depth, in the past on the list (https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@...%3E)
 
 
 

On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <[hidden email]> wrote:
I am on k8s where there is no support yet afaik, there is wip wrt the shuffle service. So from your experience there are no issues with using the batch dynamic allocation version like there was before with dstreams as described in the related jira? 

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros



--
Stavros Kontopoulos
Principal Engineer
mob: <a href="tel:+30+6977967274" value="+16506780020" target="_blank">+30 6977967274

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Gabor Somogyi
K8s is a different story, please take a look at the doc "Future Work" part.

On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <[hidden email]> wrote:
Btw the heuristics for batch mode (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289) vs 
streaming (https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92) are different. In batch mode you care about the numRunningOrPendingTasks while for streaming about the ratio: averageBatchProcTime.toDouble / batchDurationMs so there are some concerns beyond scaling down when idle. 
A scenario things might now work for batch dynamic allocation with SS is as follows. I start with a query that reads x kafka partitions and the data arriving is low and all tasks (1 per partition) are running since there are enough resources anyway.
At some point the data increases per partition (maxOffsetsPerTrigger is high enough) and so processing takes more time. AFAIK SS will wait for a batch to finish before running the next (waits for the trigger to finish, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46).
In this case I suspect there is no scaling up with the batch dynamic allocation mode as there are no pending tasks, only processing time changed. In this case the streaming dynamic heuristics I think are better. 
Batch mode heuristics could work, if not mistaken, if you have multiple streaming queries and there are batches waiting (using fair-scheduling etc).

PS. this has been discussed, not in depth, in the past on the list (https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@...%3E)
 
 
 

On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <[hidden email]> wrote:
I am on k8s where there is no support yet afaik, there is wip wrt the shuffle service. So from your experience there are no issues with using the batch dynamic allocation version like there was before with dstreams as described in the related jira? 

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros



--
Stavros Kontopoulos
Principal Engineer
mob: <a href="tel:+30+6977967274" value="+16506780020" target="_blank">+30 6977967274

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Stavros Kontopoulos-3
Sure im not talking about k8s here.
The discussion is about the heuristics and their drawbacks. 

Στις Δευ, 27 Μαΐ 2019, 2:04 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
K8s is a different story, please take a look at the doc "Future Work" part.

On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <[hidden email]> wrote:
Btw the heuristics for batch mode (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289) vs 
streaming (https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92) are different. In batch mode you care about the numRunningOrPendingTasks while for streaming about the ratio: averageBatchProcTime.toDouble / batchDurationMs so there are some concerns beyond scaling down when idle. 
A scenario things might now work for batch dynamic allocation with SS is as follows. I start with a query that reads x kafka partitions and the data arriving is low and all tasks (1 per partition) are running since there are enough resources anyway.
At some point the data increases per partition (maxOffsetsPerTrigger is high enough) and so processing takes more time. AFAIK SS will wait for a batch to finish before running the next (waits for the trigger to finish, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46).
In this case I suspect there is no scaling up with the batch dynamic allocation mode as there are no pending tasks, only processing time changed. In this case the streaming dynamic heuristics I think are better. 
Batch mode heuristics could work, if not mistaken, if you have multiple streaming queries and there are batches waiting (using fair-scheduling etc).

PS. this has been discussed, not in depth, in the past on the list (https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@...%3E)
 
 
 

On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <[hidden email]> wrote:
I am on k8s where there is no support yet afaik, there is wip wrt the shuffle service. So from your experience there are no issues with using the batch dynamic allocation version like there was before with dstreams as described in the related jira? 

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros



--
Stavros Kontopoulos
Principal Engineer
mob: <a href="tel:+30+6977967274" value="+16506780020" target="_blank" rel="noreferrer">+30 6977967274

Reply | Threaded
Open this post in threaded view
|

Re: dynamic allocation manager in SS

Igor Dvorzhak
Hello,

FYI, there are SPARK-24815 JIRA for adding support for Dynamic allocation support in Spark Streaming. We plan to work on this over the summer.

Let's move design discussion to the JIRA so it will be easier to move it forward.

Best regards,
Igor Dvorzhak


On Mon, May 27, 2019 at 9:41 AM Stavros Kontopoulos <[hidden email]> wrote:
Sure im not talking about k8s here.
The discussion is about the heuristics and their drawbacks. 

Στις Δευ, 27 Μαΐ 2019, 2:04 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
K8s is a different story, please take a look at the doc "Future Work" part.

On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <[hidden email]> wrote:
Btw the heuristics for batch mode (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289) vs 
streaming (https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92) are different. In batch mode you care about the numRunningOrPendingTasks while for streaming about the ratio: averageBatchProcTime.toDouble / batchDurationMs so there are some concerns beyond scaling down when idle. 
A scenario things might now work for batch dynamic allocation with SS is as follows. I start with a query that reads x kafka partitions and the data arriving is low and all tasks (1 per partition) are running since there are enough resources anyway.
At some point the data increases per partition (maxOffsetsPerTrigger is high enough) and so processing takes more time. AFAIK SS will wait for a batch to finish before running the next (waits for the trigger to finish, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46).
In this case I suspect there is no scaling up with the batch dynamic allocation mode as there are no pending tasks, only processing time changed. In this case the streaming dynamic heuristics I think are better. 
Batch mode heuristics could work, if not mistaken, if you have multiple streaming queries and there are batches waiting (using fair-scheduling etc).

PS. this has been discussed, not in depth, in the past on the list (https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@...%3E)
 
 
 

On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <[hidden email]> wrote:
I am on k8s where there is no support yet afaik, there is wip wrt the shuffle service. So from your experience there are no issues with using the batch dynamic allocation version like there was before with dstreams as described in the related jira? 

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <[hidden email]> wrote:
Yes nothing happens. In this case it could propagate info to the resource manager to scale down the number of executors no? Just a thought. 

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <[hidden email]> έγραψε:
Structured Streaming works differently. If no data arrives no tasks are executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <[hidden email]> wrote:
Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues with the batch based one. Should this be ported to structured streaming? Thoughts?
AFAIK there is no support in SS for it. 

Best,
Stavros



--
Stavros Kontopoulos
Principal Engineer
mob: <a href="tel:+30+6977967274" value="+16506780020" rel="noreferrer" target="_blank">+30 6977967274


smime.p7s (6K) Download Attachment