spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Suket Arora <suket...@gmail.com>
Subject Re: Structured Streaming Kafka - Weird behavior with performance and logs
Date Tue, 14 May 2019 17:10:40 GMT
// Continuous trigger with one-second checkpointing intervaldf.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()



On Tue, 14 May 2019 at 22:10, suket arora <suket224@gmail.com> wrote:

> Hey Austin,
>
> If you truly want to process as a stream, use continuous streaming in
> spark using a continous trigger.
> Spark by default uses micro batches if you don't specify a trigger.
> (trigger processing when next batch arrives)
>
> Regards,
> Suket
>
> On Mon, 13 May 2019 at 14:23, Austin Weaver <austin@flyrlabs.com> wrote:
>
>> Hi Akshay,
>>
>> Thanks very much for the reply!
>>
>> 1) The topics have 12 partitions (both input and output)
>> 2-3) I read that "trigger" is used for microbatching, but it you would
>> like the stream to truly process as a "stream" as quickly as possible, then
>> to leave this opted out? In any case, I am using a minimum
>> maxOffsetsPerTrigger (varying from 20-200 just to test this argument)
>>
>> Could you please link me to documentation or an example of the progress
>> status display you mention? I see a horizontal status bar on my Spark UI
>> for the running job but it doesn't appear to give me any specific metrics
>> other than a ui display. Where exactly would I see the start/end offset
>> values per batch, is that in the spark logs? Any example (documentation,
>> sample log) would be very helpful to point me in the right direction. I
>> think debugging the offsets per batch is exactly what I need right now.
>>
>> Thanks,
>> Austin
>>
>>
>> On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj <
>> akshay.bhardwaj1988@gmail.com> wrote:
>>
>>> Hi Austin,
>>>
>>> A few questions:
>>>
>>>    1. What is the partition of the kafka topic that used for input and
>>>    output data?
>>>    2. In the write stream, I will recommend to use "trigger" with a
>>>    defined interval, if you prefer micro-batching strategy,
>>>    3. along with defining "maxOffsetsPerTrigger" in kafka readStream
>>>    options, which lets you choose the amount of messages you want per trigger.
>>>    (Helps in maintaining the expected threshold of executors/memory for the
>>>    cluster)
>>>
>>> For repeated log messages, notice in your logs the streaming query
>>> progress published. This progress status displays a lot of metrics that
>>> shall be your first diagnosis to identify issues.
>>> The progress status with kafka stream displays the "startOffset" and
>>> "endOffset" values per batch. This is listed topic-partition wise the start
>>> to end offsets per trigger batch of streaming query.
>>>
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Tue, May 7, 2019 at 8:02 PM Austin Weaver <austin@flyrlabs.com>
>>> wrote:
>>>
>>>> Hey Spark Experts,
>>>>
>>>> After listening to some of you, and the presentations at Spark Summit
>>>> in SF, I am transitioning from d-streams to structured streaming however
I
>>>> am seeing some weird results.
>>>>
>>>> My use case is as follows: I am reading in a stream from a kafka topic,
>>>> transforming a message, and writing the transformed message to another
>>>> kafka topic.
>>>>
>>>> While running my stream, I can see the transformed messages on the
>>>> output topic so I know the basic structure of my stream seems to be running
>>>> as intended.
>>>>
>>>> Inside my transformation, I am logging the total transform time as well
>>>> as the raw message being transformed. (Java by the way)
>>>>
>>>> The 2 weird things I am seeing:
>>>> 1) I am seeing that the consumer lag for this particular consumer group
>>>> on the input topic is increasing. This does not make sense to me - looking
>>>> at the transform time from the logs, it should easily be able to handle the
>>>> incoming feed. To give an example the transform times are < 10 ms per
>>>> record and the sample of data does not contain > 100 messages per second.
>>>> The stream should be reducing consumer lag as it runs (especially
>>>> considering multiple workers and partitions)
>>>>
>>>> 2) I am seeing the same log transformation messages over and over on
>>>> the dataproc spark cluster logs. For example, I am currently looking at my
>>>> logs and the last 20+ log messages are the exact same
>>>>
>>>> I thought 2 may be due to offsets not being handled correctly, but I am
>>>> seeing a reasonable range of transformed messages on the target topic, and
>>>> I'm using the built in checkpointing for spark to handle the offsets for
me.
>>>>
>>>> In terms of 1, why would I be seeing the same log messages over and
>>>> over? It doesnt make sense to me - wouldnt the message only be transformed
>>>> once and it's offset committed?
>>>>
>>>> If anything stands out as incorrect, or something you've seen please
>>>> let me know - this is rather new to me and my code seems to be following
>>>> the same as other examples I see across the net
>>>>
>>>> Here's a redacted snippet of my stream:
>>>>
>>>> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
>>>> "XXXXX")
>>>>         .option("kafka.partition.assignment.strategy",
>>>> RoundRobinAssignor.class.getName())
>>>>         .option("subscribe", ""XXXX"")
>>>>         .option("startingOffsets", "earliest")
>>>>         .load()
>>>>         .select("value")
>>>>         .as(Encoders.STRING())
>>>>         .map((MapFunction<String, String>) value -> transform(value),
>>>> Encoders.STRING())
>>>>         .writeStream()
>>>>         .format("kafka")
>>>>         .option("kafka.bootstrap.servers", "XXXXX")
>>>>         .option("topic", ""XXXXX"")
>>>>         .outputMode("append")
>>>>         .option("checkpointLocation", "/checkpoints/testCheckpoint")
>>>>         .start()
>>>>         .awaitTermination();
>>>>
>>>> Thanks!
>>>> Austin
>>>>
>>>
>>
>> --
>> Austin Weaver
>> Software Engineer
>> FLYR, Inc.   www.flyrlabs.com
>>
>

Mime
View raw message