Nice question :) 
Ideally you should use a queuestream interface to push RDD into a queue & then spark streaming can handle the rest. 
Though why are you looking to convert RDD to DStream, another workaround folks use is to source DStream from folders & move files that they need reprocessed back into the folder, its a hack but much less headache . 

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi



On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com> wrote:
Hi everyone

I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list?

Thanks,
Aniket


On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatnagar@gmail.com> wrote:
Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this?

I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts?

class RDDExtension[T](rdd: RDD[T]) {

  def chunked(chunkSize: Int): RDD[Seq[T]] = {
    rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
  }

  def skipFirst(): RDD[T] = {
    rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
  }

  def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = {
    new InputDStream[T](streamingContext) {

      @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize)

      override def start(): Unit = {}

      override def stop(): Unit = {}

      override def compute(validTime: Time): Option[RDD[T]] = {
        val chunk = currentRDD.take(1)
        currentRDD = currentRDD.skipFirst()
        Some(rdd.sparkContext.parallelize(chunk))
      }

      override def slideDuration = {
        slideDurationMilli.map(duration => new Duration(duration)).
          getOrElse(super.slideDuration)
      }
    }

}