spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabh...@gmail.com>
Subject Re: Re: About the question of Spark Structured Streaming window output
Date Mon, 27 Aug 2018 03:01:33 GMT
You may want to add streaming listener to your query and see when/how
watermark is updated. In short, watermark is calculated from previous batch
and calculated value is applied to current batch. So you may think that the
result is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, zrc@zjdex.com <zrc@zjdex.com>님이 작성:

> Hi Gerard Mass:
> Thanks a lot for your reply.
> When i use "append" model,  I send the following data:
> 2018-08-27 09:53:00,1
> 2018-08-27 09:53:01,1
> The result (which has only schema, like the following) has received after
> the batch is end. But when the time of window + watermark is up, there is
> no result to output. Is there something I misss? Thanks in advance.
>
>
>
> ------------------------------
> zrc@zjdex.com
>
>
> *From:* Gerard Maas <gerard.maas@gmail.com>
> *Date:* 2018-08-27 05:00
> *To:* zrc <zrc@zjdex.com>
> *CC:* spark users <user@spark.apache.org>; wch <wch@zjdex.com>
> *Subject:* Re: About the question of Spark Structured Streaming window
> output
>
> Hi,
>
> When you use a window in Append mode, you need to wait for the end of the
> window + watermark to see the final record from the "append" mode.
> This is your query over time. Note the timestamp at the right side of the
> cell and the data present in it.
>
> val windowedCounts = dataSrc
>       .withWatermark("timestamp", "1 minutes")
>       .groupBy(window($"timestamp", "5 minutes"))
>       .agg(sum("value") as "sumvalue")
>       .select("window.start", "window.end","sumvalue")
>
>
> [image: image.png]
>
> Going back to your questions:
> 1、when I set the append output model,  I send inputdata, but there is no
> result to output. How to use append model in window aggreate case ?
> Wait for the window + watermark to expire and you'll see the append record
> output
>
> 2、when I set the update output model, I send inputdata, the result is
> output every batch .But I want output the result only once when window is
> end. How can I do?
> Use `append` mode.
>
> kr, Gerard.
>
> On Thu, Aug 23, 2018 at 4:31 AM zrc@zjdex.com <zrc@zjdex.com> wrote:
>
>> Hi :
>>    I have some questions about spark structured streaming window output
>>  in spark 2.3.1.  I write the application code as following:
>>
>> case class DataType(time:Timestamp, value:Long) {}
>>
>> val spark = SparkSession
>>       .builder
>>       .appName("StructuredNetworkWordCount")
>>       .master("local[1]")
>>       .getOrCreate()
>>
>> import spark.implicits._
>> // Create DataFrame representing the stream of input lines from
>> connection to localhost:9999
>> val lines = spark.readStream
>>   .format("socket")
>>   .option("host", "localhost")
>>   .option("port", 9999)
>>   .load()
>>
>> val words = lines.as[String].map(l => {
>>   var tmp = l.split(",")
>>   DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
>> }).as[DataType]
>>
>> val windowedCounts = words
>>       .withWatermark("time", "1 minutes")
>>       .groupBy(window($"time", "5 minutes"))
>>       .agg(sum("value") as "sumvalue")
>>       .select("window.start", "window.end","sumvalue")
>>
>> val query = windowedCounts.writeStream
>>   .outputMode("update")
>>   .format("console")
>>   .trigger(Trigger.ProcessingTime("10 seconds"))
>>   .start()
>>
>> query.awaitTermination()
>>
>> the input data format is :
>> 2018-08-20 12:01:00,1
>> 2018-08-20 12:02:01,1
>>
>> My questions are:
>> 1、when I set the append output model,  I send inputdata, but there is no
>> result to output. How to use append model in window aggreate case ?
>> 2、when I set the update output model, I send inputdata, the result is
>> output every batch .But I want output the result only once when window is
>> end. How can I do?
>>
>> Thanks in advance!
>>
>>
>> ------------------------------
>> zrc@zjdex.com
>>
>

Mime
View raw message