spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
Subject Re: Issue with offset management using Spark on Dataproc
Date Tue, 30 Apr 2019 22:36:27 GMT
I recommend you to use Structured Streaming as it has a patch that can
workaround this issue: https://issues.apache.org/jira/browse/SPARK-26267

Best Regards,
Ryan


On Tue, Apr 30, 2019 at 3:34 PM Shixiong(Ryan) Zhu <shixiong@databricks.com>
wrote:

> There is a known issue that Kafka may return a wrong offset even if there
> is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703
>
> Best Regards,
> Ryan
>
>
> On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver <austin@flyrlabs.com>
> wrote:
>
>> @deng - There was a short erroneous period where 2 streams were reading
>> from the same topic and group id were running at the same time. We saw
>> errors in this and stopped the extra stream. That being said, I would think
>> regardless that the auto.offset.reset would kick in sine documentation says
>> that it will kick in if there is no existing current offset or that the
>> current offset no longer exists on the kafka topic? Moreover, that doesn't
>> explain the fact that the spark logs that it is on one offset for that
>> partition (5553330) - and then immediately errors out trying to read the
>> old offset (4544296) that no longer exists?
>>
>> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
>> kafka consumer configuration I am using (redacted some fields) -
>>
>> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
>> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>> kakaConsumerProperties.put("auto.offset.reset", "earliest");
>> kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
>> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
>> kakaConsumerProperties.put("security.protocol", "");
>>
>> and I'm using LocationStrategies.PreferConsistent()
>>
>> Thanks
>>
>> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
>> akshay.bhardwaj1988@gmail.com> wrote:
>>
>>> Hi Austin,
>>>
>>> Are you using Spark Streaming or Structured Streaming?
>>>
>>> For better understanding, could you also provide sample code/config
>>> params for your spark-kafka connector for the said streaming job?
>>>
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver <austin@flyrlabs.com>
>>> wrote:
>>>
>>>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka
>>>> offset issues and was wondering if you guys could help me out.
>>>>
>>>> I am currently running a spark job on Dataproc and am getting errors
>>>> trying to re-join a group and read data from a kafka topic. I have done
>>>> some digging and am not sure what the issue is. I have
>>>> auto.offset.reset set to earliest so it should being reading from the
>>>> earliest available non-committed offset and initially my spark logs look
>>>> like this :
>>>>
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-11 to offset 5553330.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-2 to offset 5555553.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-3 to offset 5555484.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-4 to offset 5555586.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-5 to offset 5555502.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-6 to offset 5555561.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-7 to offset 5555542.```
>>>>
>>>> But then the very next line I get an error trying to read from a
>>>> nonexistent offset on the server (you can see that the offset for the
>>>> partition differs from the one listed above, so I have no idea why it would
>>>> be attempting to read form that offset, here is the error on the next line:
>>>>
>>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>>>> out of range with no configured reset policy for partitions:
>>>> {demo.topic-11=4544296}
>>>>
>>>> Any ideas to why my spark job is constantly going back to this offset
>>>> (4544296), and not the one it outputs originally (5553330)?
>>>>
>>>> It seems to be contradicting itself w a) the actual offset it says its
>>>> on and the one it attempts to read and b) saying no configured reset policy
>>>> --
>>>> Austin Weaver
>>>> Software Engineer
>>>> FLYR, Inc.   www.flyrlabs.com
>>>>
>>>
>>
>> --
>> Austin Weaver
>> Software Engineer
>> FLYR, Inc.   www.flyrlabs.com
>>
>

Mime
View raw message