The input dataset has multiple days worth of data, so I thought the watermark should have been crossed. To debug, I changed the query to the code below. My expectation was that since I am doing 1 day windows with late arrivals permitted for 1 second, when it sees records for the next day, it would output a row for the previous day. When i run the code below with 'complete' output mode, I see the table and then the lastProgress output about 10 seconds later. When i run it with 'append' mode, the table has no rows, but the lastProgress output is the same.

Is withWatermark ignored in 'complete' and 'update' modes?

For append mode, I'm not able to understand why the watermark entry is still at 1970. "For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T)". The table shows that we are seeing events with timestamp  2017-05-18. At that point the 2017-05-17 window can be closed and the row output, right? 

grouped_df = frame \
.withWatermark("timestamp", "1 second") \
.groupby(F.window("timestamp", "1 day")) \
.agg(F.max("timestamp"))
query = frame.writeStream \
    .format("console") \
    .option("truncate", False) \
    .option("checkpointLocation", CKPT_LOC) \
.outputMode("complete") \
.start()
import time
i = 0
while (i < 10):
time.sleep(10)
print(query.lastProgress)
i += 1
query.awaitTermination()
The output:
+---------------------------------------------+-------------------+
|window                                       |max(timestamp)     |
+---------------------------------------------+-------------------+
|[2017-05-17 00:00:00.0,2017-05-18 00:00:00.0]|2017-05-17 23:59:59|
|[2017-05-15 00:00:00.0,2017-05-16 00:00:00.0]|2017-05-15 23:59:59|
|[2017-05-14 00:00:00.0,2017-05-15 00:00:00.0]|2017-05-14 23:59:59|
|[2017-05-16 00:00:00.0,2017-05-17 00:00:00.0]|2017-05-16 23:59:59|
|[2017-05-07 00:00:00.0,2017-05-08 00:00:00.0]|2017-05-07 23:59:59|
|[2017-05-19 00:00:00.0,2017-05-20 00:00:00.0]|2017-05-19 23:59:59|
|[2017-05-18 00:00:00.0,2017-05-19 00:00:00.0]|2017-05-18 23:59:59|
|[2017-05-20 00:00:00.0,2017-05-21 00:00:00.0]|2017-05-20 23:59:59|
|[2017-05-08 00:00:00.0,2017-05-09 00:00:00.0]|2017-05-08 23:59:59|
|[2017-05-10 00:00:00.0,2017-05-11 00:00:00.0]|2017-05-10 23:59:59|
|[2017-05-13 00:00:00.0,2017-05-14 00:00:00.0]|2017-05-13 23:59:57|
|[2017-05-21 00:00:00.0,2017-05-22 00:00:00.0]|2017-05-21 23:40:08|
|[2017-05-09 00:00:00.0,2017-05-10 00:00:00.0]|2017-05-09 23:59:59|
|[2017-05-12 00:00:00.0,2017-05-13 00:00:00.0]|2017-05-12 23:40:11|
|[2017-05-11 00:00:00.0,2017-05-12 00:00:00.0]|2017-05-11 23:59:59|
+---------------------------------------------+-------------------+
{u'stateOperators': [{u'numRowsTotal': 15, u'numRowsUpdated': 0}], 
u'eventTime': {u'watermark': u'1970-01-01T00:00:00.000Z'}, 
u'name': None, u'timestamp': u'2017-08-15T18:13:54.381Z', 
u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0, 
u'numInputRows': 0, u'sources': 
[{u'description': u'FileStreamSource[hdfs://some_ip/some_path]', 
u'endOffset': {u'logOffset': 0}, u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0, 
u'numInputRows': 0, u'startOffset': {u'logOffset': 0}}], u'durationMs': 
{u'getOffset': 57, u'triggerExecution': 57}, u'runId': u'a5b75404-c774-49db-aac5-2592211417ca', 
u'id': u'35ad86ec-f608-40b5-a48b-9507c82a87c8', u'sink': 
{u'description': u'org.apache.spark.sql.execution.streaming.ConsoleSink@7e4050cd'}}

 

On Mon, Aug 14, 2017 at 4:55 PM, Tathagata Das <tathagata.das1565@gmail.com> wrote:
In append mode, the aggregation outputs a row only when the watermark has been crossed and the corresponding aggregate is *final*, that is, will not be updated any more. 
See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

On Mon, Aug 14, 2017 at 4:09 PM, Ashwin Raju <theraju@gmail.com> wrote:
Hi,

I am running Spark 2.2 and trying out structured streaming. I have the following code:

from pyspark.sql import functions as F

df=frame \

    .withWatermark("timestamp","1 minute") \

    .groupby(F.window("timestamp","1 day"),*groupby_cols) \

    .agg(f.sum('bytes'))

query = frame.writeStream \

.format("console")

.option("checkpointLocation", '\some\chkpoint')

.outputMode("complete")

.start()

 

query.awaitTermination()



It prints out a bunch of aggregated rows to console. When I run the same query with outputMode("append") however, the output only has the column names, no rows. I was originally trying to output to parquet, which only supports append mode. I was seeing no data in my parquet files, so I switched to console output to debug, then noticed this issue. Am I misunderstanding something about how append mode works?


Thanks,

Ashwin