spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: About the question of Spark Structured Streaming window output
Date Sun, 26 Aug 2018 21:00:19 GMT
Hi,

When you use a window in Append mode, you need to wait for the end of the
window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the
cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")


[image: image.png]

Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no
result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record
output

2、when I set the update output model, I send inputdata, the result is
output every batch .But I want output the result only once when window is
end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM zrc@zjdex.com <zrc@zjdex.com> wrote:

> Hi :
>    I have some questions about spark structured streaming window output
>  in spark 2.3.1.  I write the application code as following:
>
> case class DataType(time:Timestamp, value:Long) {}
>
> val spark = SparkSession
>       .builder
>       .appName("StructuredNetworkWordCount")
>       .master("local[1]")
>       .getOrCreate()
>
> import spark.implicits._
> // Create DataFrame representing the stream of input lines from connection
> to localhost:9999
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", 9999)
>   .load()
>
> val words = lines.as[String].map(l => {
>   var tmp = l.split(",")
>   DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
> }).as[DataType]
>
> val windowedCounts = words
>       .withWatermark("time", "1 minutes")
>       .groupBy(window($"time", "5 minutes"))
>       .agg(sum("value") as "sumvalue")
>       .select("window.start", "window.end","sumvalue")
>
> val query = windowedCounts.writeStream
>   .outputMode("update")
>   .format("console")
>   .trigger(Trigger.ProcessingTime("10 seconds"))
>   .start()
>
> query.awaitTermination()
>
> the input data format is :
> 2018-08-20 12:01:00,1
> 2018-08-20 12:02:01,1
>
> My questions are:
> 1、when I set the append output model,  I send inputdata, but there is no
> result to output. How to use append model in window aggreate case ?
> 2、when I set the update output model, I send inputdata, the result is
> output every batch .But I want output the result only once when window is
> end. How can I do?
>
> Thanks in advance!
>
>
> ------------------------------
> zrc@zjdex.com
>

Mime
View raw message