spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Saving Data only if Dstream is not empty
Date Tue, 09 Dec 2014 15:17:21 GMT
We have a similar case in which we don't want to save data to Cassandra if
the data is empty.
In our case, we filter the initial DStream to process messages that go to a
given table.

To do so, we're using something like this:

dstream.foreachRDD{ (rdd,time) =>
   tables.foreach{ table =>
     val filteredRdd = rdd.filter(record =>  predicate to assign records to
tables)
     filteredRdd.cache
     if (filteredRdd.count>0) {
        filteredRdd.saveAsFoo(...) // we do here saveToCassandra, you could
do saveAsTextFile(s"$path/$time")
     }
     filteredRdd.unpersist
}

Using the 'time' parameter you can implement an unique name based on the
timestamp for the  saveAsTextfile(filename) call which is what the
Dstream.saveAsTextFile(...) gives you.  (so it boils down to what Sean
said... you implement the saveAs yourself)

-kr, Gerard.
@maasg

On Tue, Dec 9, 2014 at 1:56 PM, Sean Owen <sowen@cloudera.com> wrote:

> I don't believe you can do this unless you implement the save to HDFS
> logic yourself. To keep the semantics consistent, these saveAs*
> methods will always output a file per partition.
>
> On Mon, Dec 8, 2014 at 11:53 PM, Hafiz Mujadid <hafizmujadid00@gmail.com>
> wrote:
> > Hi Experts!
> >
> > I want to save DStream to HDFS only if it is not empty such that it
> contains
> > some kafka messages to be stored. What is an efficient way to do this.
> >
> >            var data = KafkaUtils.createStream[Array[Byte], Array[Byte],
> > DefaultDecoder, DefaultDecoder]    (ssc, params, topicMap,
> > StorageLevel.MEMORY_ONLY).map(_._2)
> >
> >
> >     val streams = data.window(Seconds(interval*4),
> > Seconds(interval*2)).map(x => new String(x))
> >     //streams.foreachRDD(rdd=>rdd.foreach(println))
> >
> > //what condition can be applied here to store only non empty DStream
> >     streams.saveAsTextFiles(sink, "msg")
> > Thanks
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Data-only-if-Dstream-is-not-empty-tp20587.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message