[SS] watermark, eventTime and "StreamExecution: Streaming query made progress"

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[SS] watermark, eventTime and "StreamExecution: Streaming query made progress"

Jacek Laskowski
Hi,

I'm curious why watermark is updated the next streaming batch after
it's been observed [1]? The report (from
ProgressReporter/StreamExecution) does not look right to me as
avg/max/min are already calculated according to the watermark [2]

My recommendation would be to do the update [2] in the same streaming
batch it was observed. Why not? Please enlighten.

17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:20.004Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.7601672367920943,
  "processedRowsPerSecond" : 25.31645569620253,
  "durationMs" : {
    "addBatch" : 48,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 79,
    "walCommit" : 23
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:17.782Z",
    "max" : "2017-08-11T07:04:18.282Z",
    "min" : "2017-08-11T07:04:17.282Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },

...

17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:30.003Z",
  "batchId" : 2,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 56.17977528089888,
  "durationMs" : {
    "addBatch" : 147,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 178,
    "walCommit" : 22
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:23.782Z",
    "max" : "2017-08-11T07:04:28.282Z",
    "min" : "2017-08-11T07:04:19.282Z",
    "watermark" : "2017-08-11T07:04:08.282Z"
  },

[1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L538
[2] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L257

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [SS] watermark, eventTime and "StreamExecution: Streaming query made progress"

Michael Armbrust
The point here is to tell you what watermark value was used when executing this batch.  You don't know the new watermark until the batch is over and we don't want to do two passes over the data.  In general the semantics of the watermark are designed to be conservative (i.e. just because data is older than the watermark does not mean it will be dropped, but data will never be dropped until after it is below the watermark).

On Fri, Aug 11, 2017 at 12:23 AM, Jacek Laskowski <[hidden email]> wrote:
Hi,

I'm curious why watermark is updated the next streaming batch after
it's been observed [1]? The report (from
ProgressReporter/StreamExecution) does not look right to me as
avg/max/min are already calculated according to the watermark [2]

My recommendation would be to do the update [2] in the same streaming
batch it was observed. Why not? Please enlighten.

17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:20.004Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.7601672367920943,
  "processedRowsPerSecond" : 25.31645569620253,
  "durationMs" : {
    "addBatch" : 48,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 79,
    "walCommit" : 23
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:17.782Z",
    "max" : "2017-08-11T07:04:18.282Z",
    "min" : "2017-08-11T07:04:17.282Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },

...

17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:30.003Z",
  "batchId" : 2,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 56.17977528089888,
  "durationMs" : {
    "addBatch" : 147,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 178,
    "walCommit" : 22
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:23.782Z",
    "max" : "2017-08-11T07:04:28.282Z",
    "min" : "2017-08-11T07:04:19.282Z",
    "watermark" : "2017-08-11T07:04:08.282Z"
  },

[1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L538
[2] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L257

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Loading...