My curation of pending structured streaming PRs to review

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

My curation of pending structured streaming PRs to review

Jungtaek Lim
Hi devs,

As we make progress on some minor PRs on structured streaming, I'd like to remind about major PRs on SS area to get more chances to be reviewed.

Please note that I only include existing PRs, so something still not discussed like queryable state is not included in the curation list. Also, I've excluded PRs on continuous processing, as I'm not fully sure about current direction and vision on this feature. Minor PRs are mostly excluded unless they are proposed for a long ago. Last, I could be biased on curating list.

Let's get started!

----
A. File Source/Sink

1. [SPARK-20568][SS] Provide option to clean up completed files in streaming query

ISSUE: https://issues.apache.org/jira/browse/SPARK-20568
PR: https://github.com/apache/spark/pull/22952

From the nature of "stream", the input data will grow infinitely and end users want to have a clear way to clean up completed files. Unlike batch query, structured streaming doesn't require all input files to be presented - once they've been committed (say, completed processing), they wouldn't be read from such query.

This patch automatically cleans up input files when they're committed, with three options: 1) keep it as it is, 2) archive (move) to other directory 3) delete.

2. [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

ISSUE: https://issues.apache.org/jira/browse/SPARK-27188
PR: https://github.com/apache/spark/pull/24128

File sink writes metadata which records list of output files to ensure file source to only read correct files, which helps to achieve end-to-end exactly once. But file sink has no idea when output files will not be accessed from downstream query, so metadata just grows infinitely and output files cannot be removed safely.

This patch opens the chance for end users to provide TTL on output files so that metadata will eventually exclude expired output files as well as end users could remove the output files safely.


B. Kafka Source/Sink

1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.

ISSUE: https://issues.apache.org/jira/browse/SPARK-21869
PR: https://github.com/apache/spark/pull/19096

This is a long-lasting bug (around 2 years after filing the JIRA issue): if some task uses cached Kafka producer longer than 10 minutes, pool will recognize it as "timed-out" and just close it. After closing undefined behavior from task side will occur.

This patch adds "in-use" tracking on producer to address this. Please note that Kafka producer is thread-safe (whereas Kafka consumer is not) and we allow using it concurrently, so we can't adopt commons pool to pool producer. (Though we can still leverage commons pool if we are OK to not share between threads.)

2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

ISSUE: https://issues.apache.org/jira/browse/SPARK-23539
PR: https://github.com/apache/spark/pull/22282

As there's great doc to rationalize the needs on supporting Kafka headers, I'll just let the doc explaining it.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

Please note that the issue has been commented from end users regarding availability, which also represents the needs on end users' side.

3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

ISSUE: https://issues.apache.org/jira/browse/SPARK-25151
PR: https://github.com/apache/spark/pull/22138

Kafka source has its pooling logic for consumers, but as I saw some JIRA issues regarding pooling we seem to agree we would like to replace with known pool implementation which provides advanced configuration, detailed metrics, etc.

This patch adopts Apache Commons Pool (which above advantages are brought) to be used as a connection pool for consumers, with respecting to current behavior whenever possible. It also separates pooling for consumer and fetched data which enables to maximize efficiency on pooling consumers, and also address the bug on unnecessary re-fetch on self-join. (The result of experiment is in PR's content.)

4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) SQL

ISSUE: https://issues.apache.org/jira/browse/SPARK-26848
PR: https://github.com/apache/spark/pull/23747

When end users would want to replay their records in Kafka topic, they wouldn't memorize exact offsets per each partition but Spark requires to do that, otherwise just start from earliest. We as human being are much familiar with time, once we want to replay some records we know the timestamp of records we should start from.

This patch opens the chance for end users to provide offset by timestamp (either starting or ending, or both) which will be transparently passed on Kafka when requesting.


C. State

1. [SPARK-27237][SS] Introduce State schema validation among query restart

ISSUE: https://issues.apache.org/jira/browse/SPARK-27237
PR: https://github.com/apache/spark/pull/24173

Spark doesn't have explicit mechanism to avoid end users to change their query as "non-compatible". We documented the rules where the query will not be compatible between changes, but it's not easier to self-determine the rules, and non-friendly error message will be thrown if end users violate the rule. In fact, undefined behavior will occur.

This patch introduces state schema validation, which verifies schema compatibility regarding states between changes of query, and provides informative error message on end users so that they indicate previous schema and current schema of state.

This is also a baseline of new data source - state, as we can leverage state schema information and not requiring end users to input the schema.

2. [SPARK-28191][SS] New data source - state - reader part

ISSUE: https://issues.apache.org/jira/browse/SPARK-28191
PR: https://github.com/apache/spark/pull/24990

Please read below JIRA issue to see rationalization of state data source, as the issue description contains the cases where state data source can be used. (e.g. schema evolution on state, offline rescale on state, etc.)
https://issues.apache.org/jira/browse/SPARK-28190

This patch deals with source part - enables reading states on structured streaming query to the batch query.

3. [SPARK-28120][SS] Rocksdb state storage implementation

ISSUE: https://issues.apache.org/jira/browse/SPARK-28120
PR: https://github.com/apache/spark/pull/24922

The memory has been huge limitation of state size. As structured streaming loads two versions of state in executor by default, memory pressure becomes the real problem on dealing with large state. Scaling up executors may work, but it requires unnecessary waste of resource, and it can't help when executor is beyond number of partitions. (State data source will eventually help on repartitioning but it requires offline batch query.)

State store which resides outside of memory is mandatory to structured streaming for dealing with large state, and this patch is trying to address it by introducing RocksDB state store provider.


D. Structured Streaming

1. [SPARK-24634][SS] Add a new metric regarding number of rows later than watermark

ISSUE: https://issues.apache.org/jira/browse/SPARK-24634
PR: https://github.com/apache/spark/pull/24936

Spark does't provide any information on late rows which could be dropped on stateful processor.

This patch adds metrics on counting late rows so that end users can be noticed about it. Please note that the issue was originally meant to provide the number of dropped rows due the late, but Spark does pre-aggregation on streaming aggregation, so it doesn't provide correct number. Current approach is less informative than origin intention but still bring the value, for example, determining whether the query is affected by SPARK-28074.

2. [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

ISSUE: https://issues.apache.org/jira/browse/SPARK-26154
PR: https://github.com/apache/spark/pull/23634

This is long-standing correctness issue, and multiple end users (including me) reported about the behavior. This is occurred on edge-case, but the edge-case is not hard to reproduce, even closer to example query we provide as streaming outer join.

This patch addresses the correctness issue via changing the state on join - introduced "matched" flag.

3. [SPARK-26655][SS] Support multiple aggregates in append mode

ISSUE: https://issues.apache.org/jira/browse/SPARK-26655
PR: https://github.com/apache/spark/pull/23576

Multiple streaming aggregates has been concerned by end users - in perspective of end users, it sounds like just an essential thing to support, but Spark doesn't support this. There're many SO questions as well as mail threads asking this feature, but we still didn't deal with it.

If we only think about append mode, technically the feature is bound to proper definition of watermark. We haven't considered watermark calculation (and/or propagation) for multiple stages of stateful operations, but as there's widely used concept on multiple stages of watermark, we can leverage it and focus how to apply it to Spark.
(For update mode, retraction is needed which would require huge efforts on adopting, so let's ignore for now.)

Please keep in mind, lack of definition of watermark on multiple stateful stages is not only the problem of multiple streaming aggregations, but also multiple stateful operations (including streaming join, flatMapGroupsWithState, deduplicate, etc) which is not technically restricted by Spark. SPARK-28074 points out this problem.

This patch tries to address multiple aggregates - the patch itself may not be valid, but there's a design doc we can move forward and update the implementation.

4. [SPARK-27330][SS] support task abort in foreach writer

ISSUE: https://issues.apache.org/jira/browse/SPARK-27330
PR: https://github.com/apache/spark/pull/24382

Foreach writer could leak resource when task is aborted, as Spark does't call writer.close() when task is aborted. If the task throws exception in process in foreach writer or succeeds to commit, it would properly call close(), but in other case calling close() is missing due to missing proper handle about abort.

This patch fixes the bug.

5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful operations in single query

ISSUE: https://issues.apache.org/jira/browse/SPARK-28074
PR: https://github.com/apache/spark/pull/24890

As I mentioned in SPARK-26655, Spark doesn't restrict using multiple stateful operations in single query (except streaming aggregations), where the concept of watermark is not covered properly on multiple stateful stages.

I've explained this issue with example on dev mailing list earlier, so you can refer the link to see rationalization of issue.
https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E

We've not decided how to let end users avoid the issue (dealing with SPARK-26655 is the best but in the meanwhile...) and this patch is trying to establish (or discuss) how to guide end users.

SPARK-24634 would be help to end users to determine whether their query is affected by this issue, as in append mode intermediate output should not be later than watermark.
----

Please chime in and share your curation if I'm missing something.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: My curation of pending structured streaming PRs to review

vikram.agrawal

Thanks, Jungtaek for curating this list. It covers a lot of important fixes and performance improvements in structured streaming. 

Hi Devs

What is missing from process perspective from getting these PRs merged? Apart from this list, is there any other forum where we can request attention to such important PRs. Is the lack of reviews limited to Structured streaming or are there other areas of spark which are suffering from similar neglect? Does the community feel that we need a better turnaround for PRs to make sure that we don't miss out on important contributions and encourage newbies like me?

Thanks
Vikram

On Tue, Jul 16, 2019 at 10:41 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As we make progress on some minor PRs on structured streaming, I'd like to remind about major PRs on SS area to get more chances to be reviewed.

Please note that I only include existing PRs, so something still not discussed like queryable state is not included in the curation list. Also, I've excluded PRs on continuous processing, as I'm not fully sure about current direction and vision on this feature. Minor PRs are mostly excluded unless they are proposed for a long ago. Last, I could be biased on curating list.

Let's get started!

----
A. File Source/Sink

1. [SPARK-20568][SS] Provide option to clean up completed files in streaming query

ISSUE: https://issues.apache.org/jira/browse/SPARK-20568
PR: https://github.com/apache/spark/pull/22952

From the nature of "stream", the input data will grow infinitely and end users want to have a clear way to clean up completed files. Unlike batch query, structured streaming doesn't require all input files to be presented - once they've been committed (say, completed processing), they wouldn't be read from such query.

This patch automatically cleans up input files when they're committed, with three options: 1) keep it as it is, 2) archive (move) to other directory 3) delete.

2. [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

ISSUE: https://issues.apache.org/jira/browse/SPARK-27188
PR: https://github.com/apache/spark/pull/24128

File sink writes metadata which records list of output files to ensure file source to only read correct files, which helps to achieve end-to-end exactly once. But file sink has no idea when output files will not be accessed from downstream query, so metadata just grows infinitely and output files cannot be removed safely.

This patch opens the chance for end users to provide TTL on output files so that metadata will eventually exclude expired output files as well as end users could remove the output files safely.


B. Kafka Source/Sink

1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.

ISSUE: https://issues.apache.org/jira/browse/SPARK-21869
PR: https://github.com/apache/spark/pull/19096

This is a long-lasting bug (around 2 years after filing the JIRA issue): if some task uses cached Kafka producer longer than 10 minutes, pool will recognize it as "timed-out" and just close it. After closing undefined behavior from task side will occur.

This patch adds "in-use" tracking on producer to address this. Please note that Kafka producer is thread-safe (whereas Kafka consumer is not) and we allow using it concurrently, so we can't adopt commons pool to pool producer. (Though we can still leverage commons pool if we are OK to not share between threads.)

2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

ISSUE: https://issues.apache.org/jira/browse/SPARK-23539
PR: https://github.com/apache/spark/pull/22282

As there's great doc to rationalize the needs on supporting Kafka headers, I'll just let the doc explaining it.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

Please note that the issue has been commented from end users regarding availability, which also represents the needs on end users' side.

3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

ISSUE: https://issues.apache.org/jira/browse/SPARK-25151
PR: https://github.com/apache/spark/pull/22138

Kafka source has its pooling logic for consumers, but as I saw some JIRA issues regarding pooling we seem to agree we would like to replace with known pool implementation which provides advanced configuration, detailed metrics, etc.

This patch adopts Apache Commons Pool (which above advantages are brought) to be used as a connection pool for consumers, with respecting to current behavior whenever possible. It also separates pooling for consumer and fetched data which enables to maximize efficiency on pooling consumers, and also address the bug on unnecessary re-fetch on self-join. (The result of experiment is in PR's content.)

4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) SQL

ISSUE: https://issues.apache.org/jira/browse/SPARK-26848
PR: https://github.com/apache/spark/pull/23747

When end users would want to replay their records in Kafka topic, they wouldn't memorize exact offsets per each partition but Spark requires to do that, otherwise just start from earliest. We as human being are much familiar with time, once we want to replay some records we know the timestamp of records we should start from.

This patch opens the chance for end users to provide offset by timestamp (either starting or ending, or both) which will be transparently passed on Kafka when requesting.


C. State

1. [SPARK-27237][SS] Introduce State schema validation among query restart

ISSUE: https://issues.apache.org/jira/browse/SPARK-27237
PR: https://github.com/apache/spark/pull/24173

Spark doesn't have explicit mechanism to avoid end users to change their query as "non-compatible". We documented the rules where the query will not be compatible between changes, but it's not easier to self-determine the rules, and non-friendly error message will be thrown if end users violate the rule. In fact, undefined behavior will occur.

This patch introduces state schema validation, which verifies schema compatibility regarding states between changes of query, and provides informative error message on end users so that they indicate previous schema and current schema of state.

This is also a baseline of new data source - state, as we can leverage state schema information and not requiring end users to input the schema.

2. [SPARK-28191][SS] New data source - state - reader part

ISSUE: https://issues.apache.org/jira/browse/SPARK-28191
PR: https://github.com/apache/spark/pull/24990

Please read below JIRA issue to see rationalization of state data source, as the issue description contains the cases where state data source can be used. (e.g. schema evolution on state, offline rescale on state, etc.)
https://issues.apache.org/jira/browse/SPARK-28190

This patch deals with source part - enables reading states on structured streaming query to the batch query.

3. [SPARK-28120][SS] Rocksdb state storage implementation

ISSUE: https://issues.apache.org/jira/browse/SPARK-28120
PR: https://github.com/apache/spark/pull/24922

The memory has been huge limitation of state size. As structured streaming loads two versions of state in executor by default, memory pressure becomes the real problem on dealing with large state. Scaling up executors may work, but it requires unnecessary waste of resource, and it can't help when executor is beyond number of partitions. (State data source will eventually help on repartitioning but it requires offline batch query.)

State store which resides outside of memory is mandatory to structured streaming for dealing with large state, and this patch is trying to address it by introducing RocksDB state store provider.


D. Structured Streaming

1. [SPARK-24634][SS] Add a new metric regarding number of rows later than watermark

ISSUE: https://issues.apache.org/jira/browse/SPARK-24634
PR: https://github.com/apache/spark/pull/24936

Spark does't provide any information on late rows which could be dropped on stateful processor.

This patch adds metrics on counting late rows so that end users can be noticed about it. Please note that the issue was originally meant to provide the number of dropped rows due the late, but Spark does pre-aggregation on streaming aggregation, so it doesn't provide correct number. Current approach is less informative than origin intention but still bring the value, for example, determining whether the query is affected by SPARK-28074.

2. [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

ISSUE: https://issues.apache.org/jira/browse/SPARK-26154
PR: https://github.com/apache/spark/pull/23634

This is long-standing correctness issue, and multiple end users (including me) reported about the behavior. This is occurred on edge-case, but the edge-case is not hard to reproduce, even closer to example query we provide as streaming outer join.

This patch addresses the correctness issue via changing the state on join - introduced "matched" flag.

3. [SPARK-26655][SS] Support multiple aggregates in append mode

ISSUE: https://issues.apache.org/jira/browse/SPARK-26655
PR: https://github.com/apache/spark/pull/23576

Multiple streaming aggregates has been concerned by end users - in perspective of end users, it sounds like just an essential thing to support, but Spark doesn't support this. There're many SO questions as well as mail threads asking this feature, but we still didn't deal with it.

If we only think about append mode, technically the feature is bound to proper definition of watermark. We haven't considered watermark calculation (and/or propagation) for multiple stages of stateful operations, but as there's widely used concept on multiple stages of watermark, we can leverage it and focus how to apply it to Spark.
(For update mode, retraction is needed which would require huge efforts on adopting, so let's ignore for now.)

Please keep in mind, lack of definition of watermark on multiple stateful stages is not only the problem of multiple streaming aggregations, but also multiple stateful operations (including streaming join, flatMapGroupsWithState, deduplicate, etc) which is not technically restricted by Spark. SPARK-28074 points out this problem.

This patch tries to address multiple aggregates - the patch itself may not be valid, but there's a design doc we can move forward and update the implementation.

4. [SPARK-27330][SS] support task abort in foreach writer

ISSUE: https://issues.apache.org/jira/browse/SPARK-27330
PR: https://github.com/apache/spark/pull/24382

Foreach writer could leak resource when task is aborted, as Spark does't call writer.close() when task is aborted. If the task throws exception in process in foreach writer or succeeds to commit, it would properly call close(), but in other case calling close() is missing due to missing proper handle about abort.

This patch fixes the bug.

5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful operations in single query

ISSUE: https://issues.apache.org/jira/browse/SPARK-28074
PR: https://github.com/apache/spark/pull/24890

As I mentioned in SPARK-26655, Spark doesn't restrict using multiple stateful operations in single query (except streaming aggregations), where the concept of watermark is not covered properly on multiple stateful stages.

I've explained this issue with example on dev mailing list earlier, so you can refer the link to see rationalization of issue.
https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E

We've not decided how to let end users avoid the issue (dealing with SPARK-26655 is the best but in the meanwhile...) and this patch is trying to establish (or discuss) how to guide end users.

SPARK-24634 would be help to end users to determine whether their query is affected by this issue, as in append mode intermediate output should not be later than watermark.
----

Please chime in and share your curation if I'm missing something.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: My curation of pending structured streaming PRs to review

Sean Owen-2
General tips:

- dev@ is not usually the right place to discuss _specific_ changes
except once in a while to call attention
- Ping the authors of the code being changed directly
- Tighten the change if possible
- Tests, reproductions, docs, etc help prove the change
- Bugs are more important than new marginal features

If there has been some feedback that's just skeptical about the
approach or value, that may be the answer, it won't be merged.
If there is no feedback and it seems important (correctness bugs) it's
OK to raise that here once in a while.

One common theme here is 'structured streaming' -- who amongst the
committers feels they are able to review these changes? I sense we
have a shortage there.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: My curation of pending structured streaming PRs to review

Jungtaek Lim
As a reminder, the list contains two correctness bugs: stream-stream outer join, and multiple stateful operations with watermark.

Regarding common theme, yes that's somewhat I'd rather avoid to say, but honestly I feel there's shortage on active committers on 'structured streaming'.

Many of them I know as relevant to SS area didn't show up themselves in Spark community for around half a year (maybe even more), and unfortunately even active committers seem to have struggled with shortage of time doing their own works (that's natural) and haven't found time to focus reviewing other PRs (provide valuable comments but not leading PRs as shepherd to be merged). I hoped that's temporary issue for some important events like Spark+AI summit, and turned out it's not.

Spark has no replacement of SS, DStream is now even cared less than SS. Does Spark community not feeling important from streaming area? I might not agree, as there're reports from end users and patches proposed so far from contributors. I wouldn't the right one to say how can solve the issue, but I hope we would handle the main issue nicely and less painful way.


On Tue, Aug 13, 2019 at 10:42 PM Sean Owen <[hidden email]> wrote:
General tips:

- dev@ is not usually the right place to discuss _specific_ changes
except once in a while to call attention
- Ping the authors of the code being changed directly
- Tighten the change if possible
- Tests, reproductions, docs, etc help prove the change
- Bugs are more important than new marginal features

If there has been some feedback that's just skeptical about the
approach or value, that may be the answer, it won't be merged.
If there is no feedback and it seems important (correctness bugs) it's
OK to raise that here once in a while.

One common theme here is 'structured streaming' -- who amongst the
committers feels they are able to review these changes? I sense we
have a shortage there.


--