From jithinpt <>
Subject Spark structured streaming - join static dataset with streaming dataset
Date Fri, 06 Oct 2017 05:03:40 GMT
I'm using Spark structured streaming to process records read from Kafka.
Here's what I'm trying to achieve:

(a) Each record is a Tuple2 of type (Timestamp, DeviceId).

(b) I've created a static Dataset[DeviceId] which contains the set of all
valid device IDs (of type DeviceId) that are expected to be seen in the
Kafka stream.

(c) I need to write a Spark structured streaming query that

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not**
seen in that window
For example, let's say the list of all valid device IDs is [A,B,C,D,E] and
the kafka records in a certain 5-minute window contain the device IDs
[A,B,E]. Then, for that window, the list of unseen device IDs I'm looking
for is [C,D].


How can this query be written in Spark structured streaming? I tried using
the except() and join() methods that Dataset exposes. However, they both
threw a runtime exception complaining that neither of those operations is
supported on a streaming Dataset.

Here's a snippet of my code:

val validDeviceIds: Dataset[(DeviceId, Long)] =
spark.createDataset[DeviceId]( => (id, 0L))) 

case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)

// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
     .withWatermark("timestamp", "5 minutes")
     .groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
     .map(row => (row.getLong(0), 1L))
     .as[(Long, Long)]

val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
     .filter(row => row.isNullAt(1))
     .map(row => row.getLong(0))

The last statement throws the following exception:

Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a
streaming DataFrame/Dataset on the left is not supported;;

Thanks in advance.

