Yes, my code is shown below(I also post my code in another mail)
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

and the java heap space is like (I've increase the executor memory to 15g):

image.png
Michael Armbrust <michael@databricks.com>于2017年9月13日周三 上午2:23写道:
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1992@gmail.com> wrote:
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.