[DISCUSS] "complete" streaming output mode

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] "complete" streaming output mode

Jungtaek Lim-2
Hi devs,

while dealing with SPARK-31706 [1] we figured out the streaming output mode is only effective for stateful aggregation and not guaranteed on sink, which could expose data loss issue. SPARK-31724 [2] is filed to track the efforts on improving the streaming output mode.

Before we revisit the streaming output mode, I'd like to initiate the discussion around "complete" streaming output mode first, because I have no idea how it works for production use case. For me, it's only useful for niche cases and no other streaming framework has such concept.

1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

It would be a headache to retain the complete mode if we consider improving modes, as someone might concern about compatibility. It would be nice if we can make a consensus on the viewpoint of complete mode and drop supporting it if we agree with.

Would like to hear everyone's opinions. It would be great if someone brings the valid cases where complete mode is being used in production.

Thanks,
Jungtaek Lim (HeartSaVioR)



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "complete" streaming output mode

Shixiong(Ryan) Zhu
Hey Jungtaek,

I totally agree with you about the issues of the complete mode you raised here. However, not all streaming queries have unbounded states and will grow quickly to a crazy state.

Actually, I found the complete mode is pretty useful when the states are bounded and small. For example, a user can build a realtime dashboard based on daily aggregation results (only 365 or 366 keys in one year, so less than 40k keys in 100 years) using memory sink in the following steps:

- Write a streaming query to poll data from Kafka, calculate the aggregation results, and save to the memory sink in the complete mode.
- In the same Spark application, start a thrift server with "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table created by the memory sink through JDBC/ODBC.
- Connect a BI tool using JDBC/ODBC to query the temp table created by the memory sink.
- Use the BI tool to build a realtime dashboard by polling the results in a specified speed.

Best Regards,

Ryan


On Mon, May 18, 2020 at 8:44 PM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

while dealing with SPARK-31706 [1] we figured out the streaming output mode is only effective for stateful aggregation and not guaranteed on sink, which could expose data loss issue. SPARK-31724 [2] is filed to track the efforts on improving the streaming output mode.

Before we revisit the streaming output mode, I'd like to initiate the discussion around "complete" streaming output mode first, because I have no idea how it works for production use case. For me, it's only useful for niche cases and no other streaming framework has such concept.

1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

It would be a headache to retain the complete mode if we consider improving modes, as someone might concern about compatibility. It would be nice if we can make a consensus on the viewpoint of complete mode and drop supporting it if we agree with.

Would like to hear everyone's opinions. It would be great if someone brings the valid cases where complete mode is being used in production.

Thanks,
Jungtaek Lim (HeartSaVioR)



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "complete" streaming output mode

Jungtaek Lim-2
Thanks for the voice, Shixiong!

Thanks for sharing the use case of complete mode in practice. I agree that's a valid use case where complete mode would help, but I'm unsure enabling complete mode is the only way to deal with the use case.

1. Given it assumes pretty much small cardinality of the output, using "update" mode with leveraging external storages which can handle fast read and update would also work smoothly. e.g. Redis. The cons of approach is that it requires external storage to install and maintain (+ assuming there's data source implementation for the external storage).

2. I think "queryable state" (interactive queries) is the widely adopted technology for addressing such use case. It doesn't need to be accessed within the same driver, and it even doesn't need to assume the set of keys are bounded and small enough to fit in driver memory. Probably it requires major effort to implement and may need more effort to wrap with the table.

Also worth noting that there's data loss issue on complete mode if end users try to union the result of streaming aggregation and non-streaming aggregation. While we may be able to restrict the query to not applying union after streaming aggregation, we have already lots of rules to restrict the problematic cases for arbitrary plans. It seems to be time to revisit theoretically (SPARK-31724 is for).

We may need to provide the alternative for the possible use cases even we decide to drop complete mode. But before then it's more important to build a consensus that complete mode is only used for few use case (we need to collect these use cases of course) and the cost of maintenance exceeds the benefit. For sure I'm open for disagreement.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, May 21, 2020 at 9:45 AM Shixiong(Ryan) Zhu <[hidden email]> wrote:
Hey Jungtaek,

I totally agree with you about the issues of the complete mode you raised here. However, not all streaming queries have unbounded states and will grow quickly to a crazy state.

Actually, I found the complete mode is pretty useful when the states are bounded and small. For example, a user can build a realtime dashboard based on daily aggregation results (only 365 or 366 keys in one year, so less than 40k keys in 100 years) using memory sink in the following steps:

- Write a streaming query to poll data from Kafka, calculate the aggregation results, and save to the memory sink in the complete mode.
- In the same Spark application, start a thrift server with "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table created by the memory sink through JDBC/ODBC.
- Connect a BI tool using JDBC/ODBC to query the temp table created by the memory sink.
- Use the BI tool to build a realtime dashboard by polling the results in a specified speed.

Best Regards,

Ryan


On Mon, May 18, 2020 at 8:44 PM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

while dealing with SPARK-31706 [1] we figured out the streaming output mode is only effective for stateful aggregation and not guaranteed on sink, which could expose data loss issue. SPARK-31724 [2] is filed to track the efforts on improving the streaming output mode.

Before we revisit the streaming output mode, I'd like to initiate the discussion around "complete" streaming output mode first, because I have no idea how it works for production use case. For me, it's only useful for niche cases and no other streaming framework has such concept.

1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

It would be a headache to retain the complete mode if we consider improving modes, as someone might concern about compatibility. It would be nice if we can make a consensus on the viewpoint of complete mode and drop supporting it if we agree with.

Would like to hear everyone's opinions. It would be great if someone brings the valid cases where complete mode is being used in production.

Thanks,
Jungtaek Lim (HeartSaVioR)



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "complete" streaming output mode

Burak Yavuz-2
Oh wow. I never thought this would be up for debate. I use complete mode VERY frequently for all my dashboarding use cases. Here are some of my thoughts:

> 1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

yes, this is a conscious architectural decision users need to make. There are many cases where state is finite and small.

> 2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

Not if you add a filter that filters the results of the aggregation output ;) For example, I have aggregations that filter out only the last week of data, and I use it in Complete mode without any issues. I don't remember if we drop filtered values from the state as well, but I haven't faced a memory issue yet (streams have been running since the creation of Structured Streaming).

> 3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

This is trivial and super cheap for certain data sources like JDBC, Delta, Iceberg. Again, it becomes a question of using the right tools and the right architecture to solve your problem. IMHO it's not a problem of the execution engine or the mode.

> 4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

I mean, I would want all pipelines that I build to work magically without me having to put any thought into it, but then I feel most people in this email list would be out of jobs. These are typical considerations that you need to put into how you architect data pipelines. If someone doesn't put thought into the scalability of their system then ¯\_(ツ)_/¯

Let me know what you think!

Best,
Burak


On Wed, May 20, 2020 at 10:29 PM Jungtaek Lim <[hidden email]> wrote:
Thanks for the voice, Shixiong!

Thanks for sharing the use case of complete mode in practice. I agree that's a valid use case where complete mode would help, but I'm unsure enabling complete mode is the only way to deal with the use case.

1. Given it assumes pretty much small cardinality of the output, using "update" mode with leveraging external storages which can handle fast read and update would also work smoothly. e.g. Redis. The cons of approach is that it requires external storage to install and maintain (+ assuming there's data source implementation for the external storage).

2. I think "queryable state" (interactive queries) is the widely adopted technology for addressing such use case. It doesn't need to be accessed within the same driver, and it even doesn't need to assume the set of keys are bounded and small enough to fit in driver memory. Probably it requires major effort to implement and may need more effort to wrap with the table.

Also worth noting that there's data loss issue on complete mode if end users try to union the result of streaming aggregation and non-streaming aggregation. While we may be able to restrict the query to not applying union after streaming aggregation, we have already lots of rules to restrict the problematic cases for arbitrary plans. It seems to be time to revisit theoretically (SPARK-31724 is for).

We may need to provide the alternative for the possible use cases even we decide to drop complete mode. But before then it's more important to build a consensus that complete mode is only used for few use case (we need to collect these use cases of course) and the cost of maintenance exceeds the benefit. For sure I'm open for disagreement.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, May 21, 2020 at 9:45 AM Shixiong(Ryan) Zhu <[hidden email]> wrote:
Hey Jungtaek,

I totally agree with you about the issues of the complete mode you raised here. However, not all streaming queries have unbounded states and will grow quickly to a crazy state.

Actually, I found the complete mode is pretty useful when the states are bounded and small. For example, a user can build a realtime dashboard based on daily aggregation results (only 365 or 366 keys in one year, so less than 40k keys in 100 years) using memory sink in the following steps:

- Write a streaming query to poll data from Kafka, calculate the aggregation results, and save to the memory sink in the complete mode.
- In the same Spark application, start a thrift server with "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table created by the memory sink through JDBC/ODBC.
- Connect a BI tool using JDBC/ODBC to query the temp table created by the memory sink.
- Use the BI tool to build a realtime dashboard by polling the results in a specified speed.

Best Regards,

Ryan


On Mon, May 18, 2020 at 8:44 PM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

while dealing with SPARK-31706 [1] we figured out the streaming output mode is only effective for stateful aggregation and not guaranteed on sink, which could expose data loss issue. SPARK-31724 [2] is filed to track the efforts on improving the streaming output mode.

Before we revisit the streaming output mode, I'd like to initiate the discussion around "complete" streaming output mode first, because I have no idea how it works for production use case. For me, it's only useful for niche cases and no other streaming framework has such concept.

1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

It would be a headache to retain the complete mode if we consider improving modes, as someone might concern about compatibility. It would be nice if we can make a consensus on the viewpoint of complete mode and drop supporting it if we agree with.

Would like to hear everyone's opinions. It would be great if someone brings the valid cases where complete mode is being used in production.

Thanks,
Jungtaek Lim (HeartSaVioR)



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "complete" streaming output mode

Jungtaek Lim-2
Thanks for the input, Burak!

The reason I started to think the complete mode is for niche case is that the mode is most probably only helpful for the memory sink, once we address the update mode properly. Kafka has compacted topic, JDBC can upsert, Delta can merge, AFAIK Iceberg is in discussion about row level update. Most probably external storages don't want to require end users to drop everything they've been built to re-add inputs.

The complete mode is not the only way for dealing with the cases (the use cases seem to be consolidated with dashboard). I totally agree the complete mode "helps" to deal with the cases "natively" (make our life easier), but it doesn't mean it's the only way to deal with. Please refer the previous reply.

The root issue is that it's very unclear that how streaming output mode has been affecting end users' arbitrary query. There're some wall of texts in SPARK-31706 [1] (recorded in SPARK-31724 [2]) so I would skip reiterate here, but the point is that streaming output mode is only effective on streaming aggregation (not even for all stateful operations) and the semantic is easily broken from the following operators. (The possibility of data loss issue I've explained comes from there.)

Would we like to restrict operators being applied on streaming aggregation and force keeping the semantic till the sink? (like letting the result table of streaming aggregation still be keyed Dataframe, and prevent such Dataframe to be back to non-keyed Dataframe) If we would like to continue considering streaming output mode to be coupled with sink, that's also valid approach and I'm also open for it.

ps. We seem to take the path where we provide the flexibility and restrict via case by case. We are human and what we miss brings possible correctness issue. That's why I had to document the caution of possible correctness issue for global watermark. [3] There's no metric to track the dropped rows so even if it happens between operators end users won't indicate anything but get wrong output. (This is also what I've proposed to improve so far, SPARK-24634 [4].) IMHO it would be better to struggle to be theoretically correct, even it comes to discontinue support of some functionalities.


On Thu, May 21, 2020 at 3:32 PM Burak Yavuz <[hidden email]> wrote:
Oh wow. I never thought this would be up for debate. I use complete mode VERY frequently for all my dashboarding use cases. Here are some of my thoughts:

> 1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

yes, this is a conscious architectural decision users need to make. There are many cases where state is finite and small.

> 2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

Not if you add a filter that filters the results of the aggregation output ;) For example, I have aggregations that filter out only the last week of data, and I use it in Complete mode without any issues. I don't remember if we drop filtered values from the state as well, but I haven't faced a memory issue yet (streams have been running since the creation of Structured Streaming).

> 3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

This is trivial and super cheap for certain data sources like JDBC, Delta, Iceberg. Again, it becomes a question of using the right tools and the right architecture to solve your problem. IMHO it's not a problem of the execution engine or the mode.

> 4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

I mean, I would want all pipelines that I build to work magically without me having to put any thought into it, but then I feel most people in this email list would be out of jobs. These are typical considerations that you need to put into how you architect data pipelines. If someone doesn't put thought into the scalability of their system then ¯\_(ツ)_/¯

Let me know what you think!

Best,
Burak


On Wed, May 20, 2020 at 10:29 PM Jungtaek Lim <[hidden email]> wrote:
Thanks for the voice, Shixiong!

Thanks for sharing the use case of complete mode in practice. I agree that's a valid use case where complete mode would help, but I'm unsure enabling complete mode is the only way to deal with the use case.

1. Given it assumes pretty much small cardinality of the output, using "update" mode with leveraging external storages which can handle fast read and update would also work smoothly. e.g. Redis. The cons of approach is that it requires external storage to install and maintain (+ assuming there's data source implementation for the external storage).

2. I think "queryable state" (interactive queries) is the widely adopted technology for addressing such use case. It doesn't need to be accessed within the same driver, and it even doesn't need to assume the set of keys are bounded and small enough to fit in driver memory. Probably it requires major effort to implement and may need more effort to wrap with the table.

Also worth noting that there's data loss issue on complete mode if end users try to union the result of streaming aggregation and non-streaming aggregation. While we may be able to restrict the query to not applying union after streaming aggregation, we have already lots of rules to restrict the problematic cases for arbitrary plans. It seems to be time to revisit theoretically (SPARK-31724 is for).

We may need to provide the alternative for the possible use cases even we decide to drop complete mode. But before then it's more important to build a consensus that complete mode is only used for few use case (we need to collect these use cases of course) and the cost of maintenance exceeds the benefit. For sure I'm open for disagreement.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, May 21, 2020 at 9:45 AM Shixiong(Ryan) Zhu <[hidden email]> wrote:
Hey Jungtaek,

I totally agree with you about the issues of the complete mode you raised here. However, not all streaming queries have unbounded states and will grow quickly to a crazy state.

Actually, I found the complete mode is pretty useful when the states are bounded and small. For example, a user can build a realtime dashboard based on daily aggregation results (only 365 or 366 keys in one year, so less than 40k keys in 100 years) using memory sink in the following steps:

- Write a streaming query to poll data from Kafka, calculate the aggregation results, and save to the memory sink in the complete mode.
- In the same Spark application, start a thrift server with "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table created by the memory sink through JDBC/ODBC.
- Connect a BI tool using JDBC/ODBC to query the temp table created by the memory sink.
- Use the BI tool to build a realtime dashboard by polling the results in a specified speed.

Best Regards,

Ryan


On Mon, May 18, 2020 at 8:44 PM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

while dealing with SPARK-31706 [1] we figured out the streaming output mode is only effective for stateful aggregation and not guaranteed on sink, which could expose data loss issue. SPARK-31724 [2] is filed to track the efforts on improving the streaming output mode.

Before we revisit the streaming output mode, I'd like to initiate the discussion around "complete" streaming output mode first, because I have no idea how it works for production use case. For me, it's only useful for niche cases and no other streaming framework has such concept.

1. It destroys the purpose of watermark and forces Spark to maintain all of state rows, growing incrementally. It only works when all keys are bounded to the limited set.

2. It has to provide all state rows as outputs per batch, hence the size of outputs is also growing incrementally.

3. It has to truncate the target before putting rows which might not be trivial for external storage if it should be executed per batch.

4. It enables some operations like sort on streaming query or couple of more things. But it will not work cleanly (state won't keep up) under reasonably high input rate, and we have to consider how the operation will work for streaming output mode hence non-trivial amount of consideration has to be added to maintain the mode.

It would be a headache to retain the complete mode if we consider improving modes, as someone might concern about compatibility. It would be nice if we can make a consensus on the viewpoint of complete mode and drop supporting it if we agree with.

Would like to hear everyone's opinions. It would be great if someone brings the valid cases where complete mode is being used in production.

Thanks,
Jungtaek Lim (HeartSaVioR)