spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deenar Toraskar <deenar.toras...@gmail.com>
Subject Re: converting DStream[String] into RDD[String] in spark streaming [I]
Date Sun, 29 Mar 2015 08:08:18 GMT
Sean

Thank you very much for your response. I have a requirement run a function
only over the new inputs in a Spark Streaming sliding window, i.e. the
latest batch of events only, do I just get a new Dstream using the slide
duration equal to the window duration ? such as


    val sparkConf = new SparkConf().setAppName("TwitterRawJSON")
    val ssc = new StreamingContext(sparkConf, Seconds(30))
// write all new tweets in the last 10mins
    stream.window(Seconds(600), Seconds(600),
saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON")


Alternatively I could find the time of the new batch, i could do something
like this

  def saveAsTextFiles(prefix: String, suffix: String = "") {
    val saveFunc = (rdd: RDD[T], time: Time) => {
       if (time == currentBatchTime) {
        val file = rddToFileName(prefix, suffix, time)
        rdd.saveAsTextFile(file)
       }
    }
    this.foreachRDD(saveFunc)
  }

Regards
Deenar

P.S. The mail archive on nabble does not seem to show all responses.
-----Original Message-----
From: Sean Owen [mailto:sowen@cloudera.com]
Sent: 22 March 2015 11:49
To: Deenar Toraskar
Cc: user@spark.apache.org
Subject: Re: converting DStream[String] into RDD[String] in spark streaming

On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar <deenar.toraskar@db.com>
wrote:
> 1) if there are no sliding window calls in this streaming context,
> will there just one file written per interval?

As many files as there are partitions will be written in each interval.

> 2) if there is a sliding window call in the same context, such as
>
>     val hashTags = stream.flatMap(json =>
> DataObjectFactory.createStatus(json).getText.split("
> ").filter(_.startsWith("#")))
>
>     val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
> Seconds(600))
>                      .map{case (topic, count) => (count, topic)}
>                      .transform(_.sortByKey(false))
>
> will the some files get written multiples time (as long as the
> interval is in the batch)

I don't think it's right to say files will be written many times, but yes
it is my understanding that data will be written many times since a datum
lies in many windows.


---
This e-mail may contain confidential and/or privileged information. If you
are not the intended recipient (or have received this e-mail in error)
please notify the sender immediately and delete this e-mail. Any
unauthorized copying, disclosure or distribution of the material in this
e-mail is strictly forbidden.

Please refer to http://www.db.com/en/content/eu_disclosures.htm for
additional EU corporate and regulatory disclosures and to
http://www.db.com/unitedkingdom/content/privacy.htm for information about
privacy.

Mime
View raw message