spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Austin Weaver <aus...@flyrlabs.com>
Subject Re: Issue with offset management using Spark on Dataproc
Date Tue, 30 Apr 2019 17:40:41 GMT
@deng - There was a short erroneous period where 2 streams were reading
from the same topic and group id were running at the same time. We saw
errors in this and stopped the extra stream. That being said, I would think
regardless that the auto.offset.reset would kick in sine documentation says
that it will kick in if there is no existing current offset or that the
current offset no longer exists on the kafka topic? Moreover, that doesn't
explain the fact that the spark logs that it is on one offset for that
partition (5553330) - and then immediately errors out trying to read the
old offset (4544296) that no longer exists?

@Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
kafka consumer configuration I am using (redacted some fields) -

kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RoundRobinAssignor.class.getName());
kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
kakaConsumerProperties.put("auto.offset.reset", "earliest");
kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
kakaConsumerProperties.put("security.protocol", "");

and I'm using LocationStrategies.PreferConsistent()

Thanks

On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
akshay.bhardwaj1988@gmail.com> wrote:

> Hi Austin,
>
> Are you using Spark Streaming or Structured Streaming?
>
> For better understanding, could you also provide sample code/config params
> for your spark-kafka connector for the said streaming job?
>
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver <austin@flyrlabs.com>
> wrote:
>
>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
>> issues and was wondering if you guys could help me out.
>>
>> I am currently running a spark job on Dataproc and am getting errors
>> trying to re-join a group and read data from a kafka topic. I have done
>> some digging and am not sure what the issue is. I have auto.offset.reset set
>> to earliest so it should being reading from the earliest available
>> non-committed offset and initially my spark logs look like this :
>>
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-11 to offset 5553330.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-2 to offset 5555553.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-3 to offset 5555484.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-4 to offset 5555586.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-5 to offset 5555502.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-6 to offset 5555561.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-7 to offset 5555542.```
>>
>> But then the very next line I get an error trying to read from a
>> nonexistent offset on the server (you can see that the offset for the
>> partition differs from the one listed above, so I have no idea why it would
>> be attempting to read form that offset, here is the error on the next line:
>>
>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>> out of range with no configured reset policy for partitions:
>> {demo.topic-11=4544296}
>>
>> Any ideas to why my spark job is constantly going back to this offset
>> (4544296), and not the one it outputs originally (5553330)?
>>
>> It seems to be contradicting itself w a) the actual offset it says its on
>> and the one it attempts to read and b) saying no configured reset policy
>> --
>> Austin Weaver
>> Software Engineer
>> FLYR, Inc.   www.flyrlabs.com
>>
>

-- 
Austin Weaver
Software Engineer
FLYR, Inc.   www.flyrlabs.com

Mime
View raw message