spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Burak Yavuz <brk...@gmail.com>
Subject Re: Structured Streaming with Watermark
Date Thu, 18 Oct 2018 08:23:07 GMT
Hi Sandeep,

Watermarks are used in aggregation queries to ensure correctness and clean
up state. They don't allow you to drop records in map-only scenarios, which
you have in your example. If you would do a test of `groupBy().count()`
then you will see that the count doesn't increase with the last event.

On Thu, Oct 18, 2018 at 8:48 AM sandeep_katta <
sandeep0102.opensource@gmail.com> wrote:

>
> I am trying to test the water mark concept in structured streaming using
> the
> below program
>
>  import java.sql.Timestamp
>     import org.apache.spark.sql.functions.{col, expr}
>     import org.apache.spark.sql.streaming.Trigger
>
>     val lines_stream = spark.readStream.
>       format("kafka").
>       option("kafka.bootstrap.servers", "vm1:21005,vm2:21005").
>       option("subscribe", "s1").
>       load().
>       select('value.cast("String") as "key",
> ('value.cast("String")).cast("long").cast
>       ("timestamp") as "timeStampValue").
>       select("key", "timeStampValue").
>       withWatermark("timeStampValue", "10 seconds ")
>
>     val query = lines_stream.
>       writeStream.
>       option("truncate", "false").
>       outputMode("append").
>       format("console").
>       trigger(Trigger.ProcessingTime(3000)).
>       start()
>     query.awaitTermination()
>
>
> //Corresponding output
>
> scala>     query.awaitTermination()
> -------------------------------------------
>
> Batch: 0
> -------------------------------------------
> +---+--------------+
> |key|timeStampValue|
> +---+--------------+
> +---+--------------+
>
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----------+-------------------+
> |key       |timeStampValue     |
> +----------+-------------------+
> |1539844822|2018-10-18 14:40:22|
> +----------+-------------------+
>
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +----------+-------------------+
> |key       |timeStampValue     |
> +----------+-------------------+
> |1539844842|2018-10-18 14:40:42|
> +----------+-------------------+
>
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +----------+-------------------+
> |key       |timeStampValue     |
> +----------+-------------------+
> |1539844862|2018-10-18 14:41:02|
> +----------+-------------------+
>
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +----------+-------------------+
> |key       |timeStampValue     |
> +----------+-------------------+
> |1539844882|2018-10-18 14:41:22|
> +----------+-------------------+
>
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +----------+-------------------+
> |key       |timeStampValue     |
> +----------+-------------------+
> |1539844852|2018-10-18 14:40:52|* // As per watermark this event should be
> discarded but it didnt*
> +----------+-------------------+
>
> Note:Below are the values I sent from kafka-producer
> 1539844822
> 1539844842
> 1539844862
> 1539844882
> 1539844852
>
> Is this correct way to test the water mark scenarios ?
>
> Regards
> Sandeep Katta
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Mime
View raw message