spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Chermenin <a.cherme...@gmail.com>
Subject State size on joining two streams
Date Tue, 18 Dec 2018 08:58:13 GMT
Hi all,

I'd like to ask about the following: we have some two streams (one with a
lot of data and another with much less) and try to join them with each
other. I modeled this situation using this code:

    val moreData = spark.readStream.format("rate")
      .option("rowsPerSecond", 1000).load()
      .withWatermark("timestamp", "15 seconds")

    val lessData = spark.readStream.format("rate")
      .option("rowsPerSecond", 1).load()
      .filter(_ => Random.nextInt(100) < 2)
      .withWatermark("timestamp", "15 seconds")

    val joinData = moreData.alias("md")
      .join(lessData.alias("ld"),
        expr(
          s"""
             | md.value = ld.value AND
             | md.timestamp BETWEEN ld.timestamp AND ld.timestamp +
interval 15 seconds
          """.stripMargin
        )
      )
      .select(moreData("timestamp"), lessData("value"))

    joinData
      .withWatermark("timestamp", "15 seconds")
      .writeStream.format("console").start().awaitTermination()

So, after several minutes I checked the state and there were about million
messages in it. And the question is why the entries aren't deleted from the
state? There are watermarks for the streams and the condition about
timestamps in the join expression.

As a result, when the job works some hours we can catch an OutOfMemory
error.

Here is the pipeline (and some info about the state):

[image: image.png]


And also, there is the condition to clean up the state but as for me it
looks some strange, for example, `timestamp - T15000ms <= -1000`:

[image: image.png]


Could anybody explain me, what does it mean?

Mime
View raw message