spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: FW: converting DStream[String] into RDD[String] in spark streaming [I]
Date Sun, 29 Mar 2015 08:49:18 GMT
You're just describing the operation of Spark Streaming at its
simplest, without windowing. You get non-overlapping RDDs of the most
recent data each time.

On Sun, Mar 29, 2015 at 8:44 AM, Deenar Toraskar
<deenar.toraskar@gmail.com> wrote:
> Sean
>
> Thank you very much for your response. I have a requirement run a function
> only over the new inputs in a Spark Streaming sliding window, i.e. the
> latest batch of events only, do I just get a new Dstream using the slide
> duration equal to the window duration ? such as
>
>
>     val sparkConf = new SparkConf().setAppName("TwitterRawJSON")
>     val ssc = new StreamingContext(sparkConf, Seconds(30))
> // write all new tweets in the last 10mins
>     stream.window(Seconds(600), Seconds(600),
> saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON")
>
>
> Alternatively I could find the time of the new batch, i could do something
> like this
>
>   def saveAsTextFiles(prefix: String, suffix: String = "") {
>     val saveFunc = (rdd: RDD[T], time: Time) => {
>        if (time == currentBatchTime) {
>         val file = rddToFileName(prefix, suffix, time)
>         rdd.saveAsTextFile(file)
>        }
>     }
>     this.foreachRDD(saveFunc)
>   }
>
> Regards
> Deenar
>
> P.S. The mail archive on nabble does not seem to show all responses.
> -----Original Mes
>
> P.sage-----
> From: Sean Owen [mailto:sowen@cloudera.com]
> Sent: 22 March 2015 11:49
> To: Deenar Toraskar
> Cc: user@spark.apache.org
> Subject: Re: converting DStream[String] into RDD[String] in spark streaming
>
> On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar <deenar.toraskar@db.com>
> wrote:
>> 1) if there are no sliding window calls in this streaming context,
>> will there just one file written per interval?
>
> As many files as there are partitions will be written in each interval.
>
>> 2) if there is a sliding window call in the same context, such as
>>
>>     val hashTags = stream.flatMap(json =>
>> DataObjectFactory.createStatus(json).getText.split("
>> ").filter(_.startsWith("#")))
>>
>>     val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
>> Seconds(600))
>>                      .map{case (topic, count) => (count, topic)}
>>                      .transform(_.sortByKey(false))
>>
>> will the some files get written multiples time (as long as the
>> interval is in the batch)
>
> I don't think it's right to say files will be written many times, but yes it
> is my understanding that data will be written many times since a datum lies
> in many windows.
>
>
> ---
> This e-mail may contain confidential and/or privileged information. If you
> are not the intended recipient (or have received this e-mail in error)
> please notify the sender immediately and delete this e-mail. Any
> unauthorized copying, disclosure or distribution of the material in this
> e-mail is strictly forbidden.
>
> Please refer to http://www.db.com/en/content/eu_disclosures.htm for
> additional EU corporate and regulatory disclosures and to
> http://www.db.com/unitedkingdom/content/privacy.htm for information about
> privacy.
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message