[Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

Chitral Verma
Hi Devs, 

For quite some time i've been looking at the structured streaming API to solve lots of use cases at my workplace, I've have some doubts I wanted to clarify regarding stateful aggregations over structured streaming.

Currently, spark provides flatMapGroupWithState (FMGWS) / mapGroupWithState (MGWS) APIs to allow custom streaming aggregations by setting/ updating intermediate `GroupedState` which may or may not expire. This GroupedState is stored in form of snapshots and the latest snapshot is entirely in memory, what might be memory consuming approach and may result in OOMs. 

Other than this, in my opinion, FMGWS is not very flexible in terms of usage (aggregation logic and needs to be written on Rows and spark sql inbuilt functions can be used) and the timeouts require query to progress in order expire keys.

To remedy this i have contributed to this project which basically moves the expiration logic to state store (rocks db) and the state store is no longer managed by the executor jvm allowing true expiration of state with nano sec precision.

My question is, is there a specific reason FMGWS API is designed the way it is and are there any down sides to the approach I have mentioned above.

Do let me know you thoughts.

Thanks 
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

Stavros Kontopoulos-3
Hi,

Databricks runtime as you already know has this enhancement and so it is considered a good option if you want to decouple state from the jvm.
Some arguments why to do so are given by the Flink paper along with incremental snapshotting: http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf. Also timers implemented in RockDb can give you higher scalability with very large states (and many timers). I am not aware of the history behind the FMGWS API (others could provide more info), but I was also looking at the API recently thinking of an API for this: https://issues.apache.org/jira/browse/SPARK-16738

Best,
Stavros

On Sun, Dec 16, 2018 at 7:58 PM Chitral Verma <[hidden email]> wrote:
Hi Devs, 

For quite some time i've been looking at the structured streaming API to solve lots of use cases at my workplace, I've have some doubts I wanted to clarify regarding stateful aggregations over structured streaming.

Currently, spark provides flatMapGroupWithState (FMGWS) / mapGroupWithState (MGWS) APIs to allow custom streaming aggregations by setting/ updating intermediate `GroupedState` which may or may not expire. This GroupedState is stored in form of snapshots and the latest snapshot is entirely in memory, what might be memory consuming approach and may result in OOMs. 

Other than this, in my opinion, FMGWS is not very flexible in terms of usage (aggregation logic and needs to be written on Rows and spark sql inbuilt functions can be used) and the timeouts require query to progress in order expire keys.

To remedy this i have contributed to this project which basically moves the expiration logic to state store (rocks db) and the state store is no longer managed by the executor jvm allowing true expiration of state with nano sec precision.

My question is, is there a specific reason FMGWS API is designed the way it is and are there any down sides to the approach I have mentioned above.

Do let me know you thoughts.

Thanks 



Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

Chitral Verma
Thanks Stavros for the clarification, I'll create some documentation for the same and raise this as an enhancement issue with pull request. 

Meanwhile if users want to use this functionality, they can always add spark-states as a dependency and use it.

On Mon, 17 Dec 2018 at 03:10, Stavros Kontopoulos <[hidden email]> wrote:
Hi,

Databricks runtime as you already know has this enhancement and so it is considered a good option if you want to decouple state from the jvm.
Some arguments why to do so are given by the Flink paper along with incremental snapshotting: http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf. Also timers implemented in RockDb can give you higher scalability with very large states (and many timers). I am not aware of the history behind the FMGWS API (others could provide more info), but I was also looking at the API recently thinking of an API for this: https://issues.apache.org/jira/browse/SPARK-16738

Best,
Stavros

On Sun, Dec 16, 2018 at 7:58 PM Chitral Verma <[hidden email]> wrote:
Hi Devs, 

For quite some time i've been looking at the structured streaming API to solve lots of use cases at my workplace, I've have some doubts I wanted to clarify regarding stateful aggregations over structured streaming.

Currently, spark provides flatMapGroupWithState (FMGWS) / mapGroupWithState (MGWS) APIs to allow custom streaming aggregations by setting/ updating intermediate `GroupedState` which may or may not expire. This GroupedState is stored in form of snapshots and the latest snapshot is entirely in memory, what might be memory consuming approach and may result in OOMs. 

Other than this, in my opinion, FMGWS is not very flexible in terms of usage (aggregation logic and needs to be written on Rows and spark sql inbuilt functions can be used) and the timeouts require query to progress in order expire keys.

To remedy this i have contributed to this project which basically moves the expiration logic to state store (rocks db) and the state store is no longer managed by the executor jvm allowing true expiration of state with nano sec precision.

My question is, is there a specific reason FMGWS API is designed the way it is and are there any down sides to the approach I have mentioned above.

Do let me know you thoughts.

Thanks