spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: [SS] watermark, eventTime and "StreamExecution: Streaming query made progress"
Date Fri, 11 Aug 2017 17:42:17 GMT
The point here is to tell you what watermark value was used when executing
this batch.  You don't know the new watermark until the batch is over and
we don't want to do two passes over the data.  In general the semantics of
the watermark are designed to be conservative (i.e. just because data is
older than the watermark does not mean it will be dropped, but data will
never be dropped until after it is below the watermark).

On Fri, Aug 11, 2017 at 12:23 AM, Jacek Laskowski <jacek@japila.pl> wrote:

> Hi,
>
> I'm curious why watermark is updated the next streaming batch after
> it's been observed [1]? The report (from
> ProgressReporter/StreamExecution) does not look right to me as
> avg/max/min are already calculated according to the watermark [2]
>
> My recommendation would be to do the update [2] in the same streaming
> batch it was observed. Why not? Please enlighten.
>
> 17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
>   "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
>   "name" : "rates-to-console",
>   "timestamp" : "2017-08-11T07:04:20.004Z",
>   "batchId" : 1,
>   "numInputRows" : 2,
>   "inputRowsPerSecond" : 0.7601672367920943,
>   "processedRowsPerSecond" : 25.31645569620253,
>   "durationMs" : {
>     "addBatch" : 48,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "queryPlanning" : 1,
>     "triggerExecution" : 79,
>     "walCommit" : 23
>   },
>   "eventTime" : {
>     "avg" : "2017-08-11T07:04:17.782Z",
>     "max" : "2017-08-11T07:04:18.282Z",
>     "min" : "2017-08-11T07:04:17.282Z",
>     "watermark" : "1970-01-01T00:00:00.000Z"
>   },
>
> ...
>
> 17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
>   "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
>   "name" : "rates-to-console",
>   "timestamp" : "2017-08-11T07:04:30.003Z",
>   "batchId" : 2,
>   "numInputRows" : 10,
>   "inputRowsPerSecond" : 1.000100010001,
>   "processedRowsPerSecond" : 56.17977528089888,
>   "durationMs" : {
>     "addBatch" : 147,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "queryPlanning" : 1,
>     "triggerExecution" : 178,
>     "walCommit" : 22
>   },
>   "eventTime" : {
>     "avg" : "2017-08-11T07:04:23.782Z",
>     "max" : "2017-08-11T07:04:28.282Z",
>     "min" : "2017-08-11T07:04:19.282Z",
>     "watermark" : "2017-08-11T07:04:08.282Z"
>   },
>
> [1] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/streaming/
> StreamExecution.scala?utf8=%E2%9C%93#L538
> [2] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/streaming/
> ProgressReporter.scala#L257
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Mime
View raw message