spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fangyuan Liu <Fangyuan....@microsoft.com.INVALID>
Subject [Ask for help] How to manually submit offsetRanges
Date Fri, 20 Sep 2019 12:23:36 GMT
Hello Sir/Madam,



    I am using spark streaming and kafka java API. And I want to know if there is any method
to commit OffsetRanges except for `commitAsync`. The problem is: I made some modification
on the OffsetRanges, and I commit it using `commitAsync` method, however, the modification
seems not committed successfully.



-----------------------------------------------------------------------------------------------------------------------------------------------------------

    For Example:

    in this batch(batch 10):

    OffsetRange o = OffsetRanges[0]

    o.fromOffset = 112233, o.untilOffset = 112244

    I made a modification so that

    o.fromOffset = 112233, o.untilOffset = 112234

    and I commit the offset ranges using commitAsync



    but in next batch(batch 11)

    I expect o.fromOffset = 112234, however o.fromOffset = 112244 actually (so the modification
seems not committed successfully)

-----------------------------------------------------------------------------------------------------------------------------------------------------------



    Here are some related code:(I am using org.apache.spark (version 2.3.0) and org.apache.kafka(0.10))



```java

JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpoint, () -> {


    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Durations.milliseconds(1000));
    streamingContext.checkpoint(checkpoint);
    List<String> topics = new ArrayList<>("a");
    String consumerGroupId = "a_comsumer_group";


    Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("bootstrap.servers", String.join(",", "broker_name"));
        kafkaParams.put("group.id", consumerGroupId);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", "false");


    JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));


    kafkaStream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();





// I made some changes to the offsetRanges


        ((CanCommitOffsets) kafkaStream).commitAsync(offsetRanges);
    });
}
```

Thanks a lot. :)





Best Regards,

Fangyuan



Mime
View raw message