Questions about Stateful Operations in SS

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Questions about Stateful Operations in SS

Zhang, Lubo

Hi all

 

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

 

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

 

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state

      ToDO;

      state.remove()

 

} else if (state.exists) {

}

}

 

 

Thanks

Lubo

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Questions about Stateful Operations in SS

Tathagata Das
Hello Lubo, 

The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit. 

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <[hidden email]> wrote:

Hi all

 

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

 

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

 

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state

      ToDO;

      state.remove()

 

} else if (state.exists) {

}

}

 

 

Thanks

Lubo

 


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Questions about Stateful Operations in SS

Zhang, Lubo

 

 

From: Tathagata Das [mailto:[hidden email]]
Sent: Thursday, July 27, 2017 3:08 AM
To: Zhang, Lubo <[hidden email]>
Cc: [hidden email]
Subject: Re: Questions about Stateful Operations in SS

 

Hello Lubo, 

 

The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit. 

 

TD

 

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <[hidden email]> wrote:

Hi all

 

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

 

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

 

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state

      ToDO;

      state.remove()

 

} else if (state.exists) {

}

}

 

 

Thanks

Lubo

 

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Questions about Stateful Operations in SS

Zhang, Lubo
In reply to this post by Tathagata Das

Got you, thanks for your reply.

 

Best regards

Lubo

 

From: Tathagata Das [mailto:[hidden email]]
Sent: Thursday, July 27, 2017 3:08 AM
To: Zhang, Lubo <[hidden email]>
Cc: [hidden email]
Subject: Re: Questions about Stateful Operations in SS

 

Hello Lubo, 

 

The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit. 

 

TD

 

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <[hidden email]> wrote:

Hi all

 

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

 

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

 

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state

      ToDO;

      state.remove()

 

} else if (state.exists) {

}

}

 

 

Thanks

Lubo

 

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Recall: Questions about Stateful Operations in SS

Zhang, Lubo
In reply to this post by Zhang, Lubo
Zhang, Lubo would like to recall the message, "Questions about Stateful Operations in SS".
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Loading...