correctness issue on chained streaming-streaming join

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

correctness issue on chained streaming-streaming join

Jungtaek Lim
Hi devs,

While helping user in user mailing list, I start to suspect that chained streaming-streaming joins works incorrectly but Structured Streaming doesn't prevent it. The reason is actually similar to why chained streaming aggregations is not supported in Structured Streaming, global watermark.

Suppose we're running below query with append mode,

A.join(B, "A_ID = B_ID AND A_EVT_TIME = B_EVT_TIME")
 .join(C, "B_ID = C_ID AND A_EVT_TIME = C_EVT_TIME")

with below dataframes:

- Dataframe A

A_ID | A_EVT_TIME
-----|-----------
A1   | 1
A2   | 2
A3   | 3

(Let's name as A1, A2, A3)

- Dataframe B

B_ID | B_EVT_TIME
-----|-----------
B2   | 2
B3   | 3

(Let's name as B2, B3)

- Dataframe C

C_ID | C_EVT_TIME
-----|-----------
C4   | 4
C5   | 5

(Let's name as C4, C5)

Please note that I'm intentionally making two conditions (in query and input data):

- take equivality of event times as join condition, to not expand time bound of eviction (it might hide the issue)
- no match will be occurred (having some matched pairs is still OK to see the behavior, but to simplify the issue...)

Let's say batch 0 read all the rows and processed them.

> output

- none

> watermark for batch 1 (next batch)

- watermark(A) = 3
- watermark(B) = 3
- watermark(C) = 5
- watermark(min) = 3
- watermark(max) = 5

> states for version 2 (output of batch 0)

- Left State for first join: A1, A2, A3
- Right State for first join: B2, B3
- Left State for second join: (none)
- Right State for second join: C4, C5

After batch 0, empty-batch will be called out to deal with eviction due to watermark forward - which would be batch 1.

Here global watermark is 3 in batch 1, so first join evicts all rows from state and outputs (A1, null), (A2, null), (A3, null) as outputs.
And second join will discard all these outputs from first join because watermark is already passed by - correctness is broken.

Below is the commit which adds test code replicating above case:
https://github.com/HeartSaVioR/spark/commit/33c8b3fbcf23d9eabaa0d4f548787cacd60bd791

I'm yet to investigate on the case for inner join, but due to the characteristic of the issue, the possibility might be open for all stateful operators as well.

If we agree that my investigation is correct, I would propose to prevent multiple stream-stream joins (as we prevent multiple streaming aggregations) for interim mitigation. For long-term solution, we may want to visit SPARK-26655 [1] which addresses operator-wise watermarks.

Thanks,
Jungtaek Lim (HeartSaVioR)


Reply | Threaded
Open this post in threaded view
|

Re: correctness issue on chained streaming-streaming join

Jungtaek Lim
General representation of this issue would be:

- stateful operator would evict rows in state when watermark passes by
- for append mode, evicted rows are used as output rows, in other words, input rows for next stateful operator
- next stateful operator would discard late input rows using same watermark used for evicting input rows

The last sentence makes contradiction here.

I wouldn't point out update and complete mode as they just don't make sense for chained stateful operators without retraction.

Any pair of chained stateful operators (streaming aggregation, stream-stream join, map/flatMapGroupsWithState) could fall into this situation according to end user's query. (Spark prohibits some of pairs but not all the pairs.) End users might be able to fully understand how watermark works for their query under the hood and craft their query with avoiding correctness issue with global watermark, but that should be pretty hard and I haven't found any docs explaining such complicated cases.

I might not be able to propose restricting everything which are open to bring correctness issue according to end user's query but not 100% of the cases, so let me take back proposal on preventing chained stream-stream join.

I would like to say instead, we may need to explain full of details and caveats if end user does chained stateful operations. And more important, as global watermark is really hard to reason about the impacts for all stateful operators, we would need to put efforts to consider per-operator watermark to make things pretty easier for end users to understand.

Would like to hear voices on this.

-Jungtaek Lim (HeartSaVioR)

On Wed, Jun 12, 2019 at 4:41 PM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

While helping user in user mailing list, I start to suspect that chained streaming-streaming joins works incorrectly but Structured Streaming doesn't prevent it. The reason is actually similar to why chained streaming aggregations is not supported in Structured Streaming, global watermark.

Suppose we're running below query with append mode,

A.join(B, "A_ID = B_ID AND A_EVT_TIME = B_EVT_TIME")
 .join(C, "B_ID = C_ID AND A_EVT_TIME = C_EVT_TIME")

with below dataframes:

- Dataframe A

A_ID | A_EVT_TIME
-----|-----------
A1   | 1
A2   | 2
A3   | 3

(Let's name as A1, A2, A3)

- Dataframe B

B_ID | B_EVT_TIME
-----|-----------
B2   | 2
B3   | 3

(Let's name as B2, B3)

- Dataframe C

C_ID | C_EVT_TIME
-----|-----------
C4   | 4
C5   | 5

(Let's name as C4, C5)

Please note that I'm intentionally making two conditions (in query and input data):

- take equivality of event times as join condition, to not expand time bound of eviction (it might hide the issue)
- no match will be occurred (having some matched pairs is still OK to see the behavior, but to simplify the issue...)

Let's say batch 0 read all the rows and processed them.

> output

- none

> watermark for batch 1 (next batch)

- watermark(A) = 3
- watermark(B) = 3
- watermark(C) = 5
- watermark(min) = 3
- watermark(max) = 5

> states for version 2 (output of batch 0)

- Left State for first join: A1, A2, A3
- Right State for first join: B2, B3
- Left State for second join: (none)
- Right State for second join: C4, C5

After batch 0, empty-batch will be called out to deal with eviction due to watermark forward - which would be batch 1.

Here global watermark is 3 in batch 1, so first join evicts all rows from state and outputs (A1, null), (A2, null), (A3, null) as outputs.
And second join will discard all these outputs from first join because watermark is already passed by - correctness is broken.

Below is the commit which adds test code replicating above case:
https://github.com/HeartSaVioR/spark/commit/33c8b3fbcf23d9eabaa0d4f548787cacd60bd791

I'm yet to investigate on the case for inner join, but due to the characteristic of the issue, the possibility might be open for all stateful operators as well.

If we agree that my investigation is correct, I would propose to prevent multiple stream-stream joins (as we prevent multiple streaming aggregations) for interim mitigation. For long-term solution, we may want to visit SPARK-26655 [1] which addresses operator-wise watermarks.

Thanks,
Jungtaek Lim (HeartSaVioR)




--