spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Austin Weaver <>
Subject Re: Structured Streaming Kafka - Weird behavior with performance and logs
Date Mon, 13 May 2019 08:53:31 GMT
Hi Akshay,

Thanks very much for the reply!

1) The topics have 12 partitions (both input and output)
2-3) I read that "trigger" is used for microbatching, but it you would like
the stream to truly process as a "stream" as quickly as possible, then to
leave this opted out? In any case, I am using a minimum
maxOffsetsPerTrigger (varying from 20-200 just to test this argument)

Could you please link me to documentation or an example of the progress
status display you mention? I see a horizontal status bar on my Spark UI
for the running job but it doesn't appear to give me any specific metrics
other than a ui display. Where exactly would I see the start/end offset
values per batch, is that in the spark logs? Any example (documentation,
sample log) would be very helpful to point me in the right direction. I
think debugging the offsets per batch is exactly what I need right now.


On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj <> wrote:

> Hi Austin,
> A few questions:
>    1. What is the partition of the kafka topic that used for input and
>    output data?
>    2. In the write stream, I will recommend to use "trigger" with a
>    defined interval, if you prefer micro-batching strategy,
>    3. along with defining "maxOffsetsPerTrigger" in kafka readStream
>    options, which lets you choose the amount of messages you want per trigger.
>    (Helps in maintaining the expected threshold of executors/memory for the
>    cluster)
> For repeated log messages, notice in your logs the streaming query
> progress published. This progress status displays a lot of metrics that
> shall be your first diagnosis to identify issues.
> The progress status with kafka stream displays the "startOffset" and
> "endOffset" values per batch. This is listed topic-partition wise the start
> to end offsets per trigger batch of streaming query.
> Akshay Bhardwaj
> +91-97111-33849
> On Tue, May 7, 2019 at 8:02 PM Austin Weaver <> wrote:
>> Hey Spark Experts,
>> After listening to some of you, and the presentations at Spark Summit in
>> SF, I am transitioning from d-streams to structured streaming however I am
>> seeing some weird results.
>> My use case is as follows: I am reading in a stream from a kafka topic,
>> transforming a message, and writing the transformed message to another
>> kafka topic.
>> While running my stream, I can see the transformed messages on the output
>> topic so I know the basic structure of my stream seems to be running as
>> intended.
>> Inside my transformation, I am logging the total transform time as well
>> as the raw message being transformed. (Java by the way)
>> The 2 weird things I am seeing:
>> 1) I am seeing that the consumer lag for this particular consumer group
>> on the input topic is increasing. This does not make sense to me - looking
>> at the transform time from the logs, it should easily be able to handle the
>> incoming feed. To give an example the transform times are < 10 ms per
>> record and the sample of data does not contain > 100 messages per second.
>> The stream should be reducing consumer lag as it runs (especially
>> considering multiple workers and partitions)
>> 2) I am seeing the same log transformation messages over and over on the
>> dataproc spark cluster logs. For example, I am currently looking at my logs
>> and the last 20+ log messages are the exact same
>> I thought 2 may be due to offsets not being handled correctly, but I am
>> seeing a reasonable range of transformed messages on the target topic, and
>> I'm using the built in checkpointing for spark to handle the offsets for me.
>> In terms of 1, why would I be seeing the same log messages over and over?
>> It doesnt make sense to me - wouldnt the message only be transformed once
>> and it's offset committed?
>> If anything stands out as incorrect, or something you've seen please let
>> me know - this is rather new to me and my code seems to be following the
>> same as other examples I see across the net
>> Here's a redacted snippet of my stream:
>> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
>> "XXXXX")
>>         .option("kafka.partition.assignment.strategy",
>> RoundRobinAssignor.class.getName())
>>         .option("subscribe", ""XXXX"")
>>         .option("startingOffsets", "earliest")
>>         .load()
>>         .select("value")
>>         .as(Encoders.STRING())
>>         .map((MapFunction<String, String>) value -> transform(value),
>> Encoders.STRING())
>>         .writeStream()
>>         .format("kafka")
>>         .option("kafka.bootstrap.servers", "XXXXX")
>>         .option("topic", ""XXXXX"")
>>         .outputMode("append")
>>         .option("checkpointLocation", "/checkpoints/testCheckpoint")
>>         .start()
>>         .awaitTermination();
>> Thanks!
>> Austin

Austin Weaver
Software Engineer
FLYR, Inc.

View raw message