spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shashikant Kulkarni (शशिकांत कुलकर्णी) <shashikant.kulka...@gmail.com>
Subject Re: Apache Spark JavaRDD pipe() need help
Date Fri, 23 Sep 2016 10:35:33 GMT
Thank you Jakob. I will try as suggested.

Regards,
Shashi

On Fri, Sep 23, 2016 at 12:14 AM, Jakob Odersky <jakob@odersky.com> wrote:

> Hi Shashikant,
>
> I think you are trying to do too much at once in your helper class.
> Spark's RDD API is functional, it is meant to be used by writing many
> little transformations that will be distributed across a cluster.
>
> Appart from that, `rdd.pipe` seems like a good approach. Here is the
> relevant doc comment (in RDD.scala) on how to use it:
>
>  Return an RDD created by piping elements to a forked external
> process. The resulting RDD
>    * is computed by executing the given process once per partition. All
> elements
>    * of each input partition are written to a process's stdin as lines
> of input separated
>    * by a newline. The resulting partition consists of the process's
> stdout output, with
>    * each line of stdout resulting in one element of the output
> partition. A process is invoked
>    * even for empty partitions.
>    *
>    * [...]
> Check the full docs here
> http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.rdd.RDD@pipe(command:String):org.apache.
> spark.rdd.RDD[String]
>
> This is how you could use it:
>
>     productRDD=//get from cassandra
>     processedRDD=productsRDD.map(STEP1).map(STEP2).pipe(C binary of step
> 3)
>     STEP4 //store processed RDD
>
> hope this gives you some pointers,
>
> best,
> --Jakob
>
>
>
>
> On Thu, Sep 22, 2016 at 2:10 AM, Shashikant Kulkarni (शशिकांत
> कुलकर्णी) <shashikant.kulkarni@gmail.com> wrote:
> > Hello Jakob,
> >
> > Thanks for replying. Here is a short example of what I am trying. Taking
> an
> > example of Product column family in Cassandra just for explaining my
> > requirement
> >
> > In Driver.java
> > {
> >          JavaRDD<Product> productsRdd = Get Products from Cassandra;
> >          productsRdd.map(ProductHelper.processProduct());
> > }
> >
> > in ProductHelper.java
> > {
> >
> >         public static Function<Product, Boolean> processProduct() {
> > return new Function< Product, Boolean>(){
> > private static final long serialVersionUID = 1L;
> >
> > @Override
> > public Boolean call(Product product) throws Exception {
> > //STEP 1: Doing some processing on product object.
> > //STEP 2: Now using few values of product, I need to create a string like
> > "name id sku datetime"
> > //STEP 3: Pass this string to my C binary file to perform some complex
> > calculations and return some data
> > //STEP 4: Get the return data and store it back in Cassandra DB
> > }
> > };
> > }
> > }
> >
> > In this ProductHelper, I cannot pass and don't want to pass sparkContext
> > object as app will throw error of "task not serializable". If there is a
> way
> > let me know.
> >
> > Now I am not able to achieve STEP 3 above. How can I pass a String to C
> > binary and get the output back in my program. The C binary reads data
> from
> > STDIN and outputs data to STDOUT. It is working from other part of
> > application from PHP. I want to reuse the same C binary in my Apache
> SPARK
> > application for some background processing and analysis using
> JavaRDD.pipe()
> > API. If there is any other way let me know. This code will be executed in
> > all the nodes in a cluster.
> >
> > Hope my requirement is now clear. How to do this?
> >
> > Regards,
> > Shash
> >
> > On Thu, Sep 22, 2016 at 4:13 AM, Jakob Odersky <jakob@odersky.com>
> wrote:
> >>
> >> Can you provide more details? It's unclear what you're asking
> >>
> >> On Wed, Sep 21, 2016 at 10:14 AM, shashikant.kulkarni@gmail.com
> >> <shashikant.kulkarni@gmail.com> wrote:
> >> > Hi All,
> >> >
> >> > I am trying to use the JavaRDD.pipe() API.
> >> >
> >> > I have one object with me from the JavaRDD
> >
> >
>

Mime
View raw message