Hi Akshay,

Thanks very much for the reply!

1) The topics have 12 partitions (both input and output)
2-3) I read that "trigger" is used for microbatching, but it you would like the stream to truly process as a "stream" as quickly as possible, then to leave this opted out? In any case, I am using a minimum maxOffsetsPerTrigger (varying from 20-200 just to test this argument)

Could you please link me to documentation or an example of the progress status display you mention? I see a horizontal status bar on my Spark UI for the running job but it doesn't appear to give me any specific metrics other than a ui display. Where exactly would I see the start/end offset values per batch, is that in the spark logs? Any example (documentation, sample log) would be very helpful to point me in the right direction. I think debugging the offsets per batch is exactly what I need right now.

Thanks,
Austin


On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj <akshay.bhardwaj1988@gmail.com> wrote:
Hi Austin,

A few questions:
  1. What is the partition of the kafka topic that used for input and output data?
  2. In the write stream, I will recommend to use "trigger" with a defined interval, if you prefer micro-batching strategy,
  3. along with defining "maxOffsetsPerTrigger" in kafka readStream options, which lets you choose the amount of messages you want per trigger. (Helps in maintaining the expected threshold of executors/memory for the cluster)
For repeated log messages, notice in your logs the streaming query progress published. This progress status displays a lot of metrics that shall be your first diagnosis to identify issues.
The progress status with kafka stream displays the "startOffset" and "endOffset" values per batch. This is listed topic-partition wise the start to end offsets per trigger batch of streaming query.


Akshay Bhardwaj
+91-97111-33849


On Tue, May 7, 2019 at 8:02 PM Austin Weaver <austin@flyrlabs.com> wrote:
Hey Spark Experts,

After listening to some of you, and the presentations at Spark Summit in SF, I am transitioning from d-streams to structured streaming however I am seeing some weird results.

My use case is as follows: I am reading in a stream from a kafka topic, transforming a message, and writing the transformed message to another kafka topic.

While running my stream, I can see the transformed messages on the output topic so I know the basic structure of my stream seems to be running as intended.

Inside my transformation, I am logging the total transform time as well as the raw message being transformed. (Java by the way)

The 2 weird things I am seeing:
1) I am seeing that the consumer lag for this particular consumer group on the input topic is increasing. This does not make sense to me - looking at the transform time from the logs, it should easily be able to handle the incoming feed. To give an example the transform times are < 10 ms per record and the sample of data does not contain > 100 messages per second. The stream should be reducing consumer lag as it runs (especially considering multiple workers and partitions)

2) I am seeing the same log transformation messages over and over on the dataproc spark cluster logs. For example, I am currently looking at my logs and the last 20+ log messages are the exact same

I thought 2 may be due to offsets not being handled correctly, but I am seeing a reasonable range of transformed messages on the target topic, and I'm using the built in checkpointing for spark to handle the offsets for me.

In terms of 1, why would I be seeing the same log messages over and over? It doesnt make sense to me - wouldnt the message only be transformed once and it's offset committed?

If anything stands out as incorrect, or something you've seen please let me know - this is rather new to me and my code seems to be following the same as other examples I see across the net

Here's a redacted snippet of my stream:

spark.readStream().format("kafka").option("kafka.bootstrap.servers", "XXXXX")
        .option("kafka.partition.assignment.strategy", RoundRobinAssignor.class.getName())
        .option("subscribe", ""XXXX"")
        .option("startingOffsets", "earliest")
        .load()
        .select("value")
        .as(Encoders.STRING())
        .map((MapFunction<String, String>) value -> transform(value), Encoders.STRING())
        .writeStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "XXXXX")
        .option("topic", ""XXXXX"")
        .outputMode("append")
        .option("checkpointLocation", "/checkpoints/testCheckpoint")
        .start()
        .awaitTermination();

Thanks!
Austin


--
Austin Weaver
Software Engineer
FLYR, Inc.   www.flyrlabs.com