spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subramgr <>
Subject [Structured Streaming] Understanding waterMark, flatMapGroupWithState and possibly windowing
Date Thu, 09 Aug 2018 02:35:13 GMT

We have a use case where we need to *sessionize* our data and for each
*session* emit some *metrics* we need to handle *repeated sessions* and
*sessions timeout*. We have come up with the following code structure and
would like to understand if we understand all the concept of *watermark*,

Can some one help me understand the following:

1. Will my memory consumption keep increasing ? 
2. Is my understanding correct that the *aggMetrics* data frame is a bounded
data frame and will always contain the last 30 minutes worth data?
3. When I do the aggregation in Step 2, will Spark only use the last 30
minute of data for aggregation ?

Here is the spark streaming code:
Step 1:
// 1. I am getting the data from Kafka around 50k events per second
// 2. I am using a 30 minutes watermark to filter out events that are
//    late.
// 3. I am using EventTimeTimeout
// 4. My `updateSessionState` func returns Itertor[Metric] (at minute
val metrics: Dataset[Metric] = kafkaEvents
  .withWatermark("timestamp", "30 minutes")
  .groupByKey(e => e._2.sessionId)

Step 2:
// 1. I am aggregating the data by the metric name and the minute
// 2. I am using the watermark here again of 30 mins assuming the 
//    results in the *metrics* which are 30 mins older will be removed from
the memory
//    Is my assumption correct???
// 3. Is *aggMetrics* a bounded data frame which will only hold last 30
minutes of data ??
val aggMetrics: Dataset[(String, Double)] = metrics
  .map(metric => (long2timestamp(metric.timestamp), metric))
  .toDF("timestamp", "metric")
  .as[(Timestamp, Metric)]
  .withWatermark("timestamp", "30 minutes")
  .map {
    case (_, m) => (s"${}.${m.timestamp}",m.count)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

Step 3:
// 1. All those metric that got updated I am emitting to KairosDB
  .option("checkpointLocation", checkpointLocation)

Sent from:

To unsubscribe e-mail:

View raw message