spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fangyuan Liu <>
Subject [Ask for help] How to manually submit offsetRanges
Date Fri, 20 Sep 2019 12:23:36 GMT
Hello Sir/Madam,

    I am using spark streaming and kafka java API. And I want to know if there is any method
to commit OffsetRanges except for `commitAsync`. The problem is: I made some modification
on the OffsetRanges, and I commit it using `commitAsync` method, however, the modification
seems not committed successfully.


    For Example:

    in this batch(batch 10):

    OffsetRange o = OffsetRanges[0]

    o.fromOffset = 112233, o.untilOffset = 112244

    I made a modification so that

    o.fromOffset = 112233, o.untilOffset = 112234

    and I commit the offset ranges using commitAsync

    but in next batch(batch 11)

    I expect o.fromOffset = 112234, however o.fromOffset = 112244 actually (so the modification
seems not committed successfully)


    Here are some related code:(I am using org.apache.spark (version 2.3.0) and org.apache.kafka(0.10))


JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpoint, () -> {

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Durations.milliseconds(1000));
    List<String> topics = new ArrayList<>("a");
    String consumerGroupId = "a_comsumer_group";

    Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("bootstrap.servers", String.join(",", "broker_name"));
        kafkaParams.put("", consumerGroupId);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("", "false");

    JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    kafkaStream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

// I made some changes to the offsetRanges

        ((CanCommitOffsets) kafkaStream).commitAsync(offsetRanges);

Thanks a lot. :)

Best Regards,


View raw message