spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sujit Pal <sujitatgt...@gmail.com>
Subject Re: Efficiency of leftOuterJoin a cassandra rdd
Date Wed, 15 Jul 2015 16:00:14 GMT
Hi Wush,

One option may be to try a replicated join. Since your rdd1 is small, read
it into a collection and broadcast it to the workers, then filter your
larger rdd2 against the collection on the workers.

-sujit


On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain <deepujain@gmail.com> wrote:

> Leftouterjoin and join apis are super slow in spark. 100x slower than
> hadoop
>
> Sent from my iPhone
>
> > On 14-Jul-2015, at 10:59 PM, Wush Wu <wush978@gmail.com> wrote:
> >
> > I don't understand.
> >
> > By the way, the `joinWithCassandraTable` does improve my query time
> > from 40 mins to 3 mins.
> >
> >
> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>:
> >> I have explored spark joins for last few months (you can search my
> posts)
> >> and its frustrating useless.
> >>
> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wush978@gmail.com> wrote:
> >>>
> >>> Dear all,
> >>>
> >>> I have found a post discussing the same thing:
> >>>
> >>>
> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
> >>>
> >>> The solution is using "joinWithCassandraTable" and the documentation
> >>> is here:
> >>>
> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
> >>>
> >>> Wush
> >>>
> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wush978@gmail.com>:
> >>>> Dear all,
> >>>>
> >>>> I am trying to join two RDDs, named rdd1 and rdd2.
> >>>>
> >>>> rdd1 is loaded from a textfile with about 33000 records.
> >>>>
> >>>> rdd2 is loaded from a table in cassandra which has about 3 billions
> >>>> records.
> >>>>
> >>>> I tried the following code:
> >>>>
> >>>> ```scala
> >>>>
> >>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
> >>>> import org.apache.spark.sql.cassandra.CassandraSQLContext
> >>>> cc.setKeyspace("xxx")
> >>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
> >>>> ...)
> >>>>
> >>>> val result = rdd1.leftOuterJoin(rdd2)
> >>>> result.take(20)
> >>>>
> >>>> ```
> >>>>
> >>>> However, the log shows that the spark loaded 3 billions records from
> >>>> cassandra and only 33000 records left at the end.
> >>>>
> >>>> Is there a way to query the cassandra based on the key in rdd1?
> >>>>
> >>>> Here is some information of our system:
> >>>>
> >>>> - The spark version is 1.3.1
> >>>> - The cassandra version is 2.0.14
> >>>> - The key of joining is the primary key of the cassandra table.
> >>>>
> >>>> Best,
> >>>> Wush
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >>
> >>
> >> --
> >> Deepak
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message