spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Li Zuwei <li.zuwei.to...@gmail.com>
Subject [Spark Structured Streaming] How to select events by latest timestamp and aggregate count
Date Mon, 09 Oct 2017 04:44:57 GMT
I would like to perform structured streaming aggregation with a windowing period. Given this
following data schema. The objective is to filter by the latest occurring event based on user.
Then aggregate the count of each event type for each location.

time    location   user   type
 1        A         1      one
 2        A         1      two
 1        B         2      one
 2        B         2      one
 1        A         3      two
 1        A         4      one
Sample output:

location   countOne   countTwo
    A          1         2
    B          1         0
something like the following:

val aggTypes = df
  .select($"location", $"time", $"user", $"type")
  .groupBy($"user")
  .agg(max($"timestamp") as 'timestamp)
  .select("*")
  .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString
+ " seconds", $"location")
  .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two",
$"type" as 'countTwo)))
  .drop($"window")
As structured streaming does not support multiple aggregations and Non-time-based windows
are not supported on streaming DataFrames/Datasets. I am not sure if it is possible to achieve
the desired output in 1 streaming query.

Any help is appreciated.
Mime
View raw message