spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Susan Zhang <suchenz...@gmail.com>
Subject Streaming: BatchTime OffsetRange Mapping?
Date Fri, 21 Aug 2015 23:01:57 GMT
I've been reading documentation on accessing offsetRanges and updating ZK
yourself when using DirectKafkaInputDStream (from createDirectStream),
along with the code changes in this PR:
https://github.com/apache/spark/pull/4805.

I'm planning on adding a listener to update ZK (for monitoring purposes)
when batch completes. What would be a consistent manner to index the
offsets for a given batch? In the PR above, it seems like batchTime was
used, but is there a way to create this batchTime -> offsets in the
streaming app itself?

Something like:

var currOffsetRanges = Array[OffsetRange]()
 directKafkaStream.transform { rdd =>
   currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.foreachRDD { rdd =>
   ... /*DO STUFF*/
}

offsetMap += ((validTime, currOffsetRanges))

Then in the listener (onBatchComplete), retrieve corresponding offsetRanges
associated with the completed batchTime and update ZK accordingly.

I'm unsure how to define validTime above. Any help/advice would be
appreciated.


Thanks!

Mime
View raw message