spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucas Fernandes Brunialti <lbrunia...@igcorp.com.br>
Subject Re: Using Cassandra as an input stream from Java
Date Thu, 05 Dec 2013 05:10:49 GMT
Hi all,

This should work:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
context.newAPIHadoopRDD(job.getConfiguration(),
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);

I have translated the word count written in scala to java, i just can't
send it right now...

Best Regards.

Lucas.
On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
pulasthi911@gmail.com> wrote:

> Hi Tal,
>
> Just checking if you have added your code to github :). if you have could
> you point me to it.
>
> Best Regards,
> Pulasthi
>
>
> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pwendell@gmail.com>wrote:
>
>> Tal - that would be great to have open sourced if you can do it!
>>
>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>> <pulasthi911@gmail.com> wrote:
>> > Hi Tal,
>> >
>> > Thanks for the info will try it out and see how it goes.
>> >
>> >
>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <tal.s@taboola.com>
>> wrote:
>> >>
>> >> Hi Pulasthi,
>> >>
>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>> another
>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>> extends
>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>> from
>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>> great!
>> >> I'm cleaning up the code a bit and will upload to github as an open
>> source
>> >> (after the summit).
>> >>
>> > That's great looking forward check it out after you publish on github
>> :).
>> >
>> >
>> > Thanks,
>> > Pulasthi
>> >>
>> >> I hope this helps for now,
>> >>
>> >> Tal
>> >>
>> >>
>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> >> <pulasthi911@gmail.com> wrote:
>> >>>
>> >>> Hi Tal,
>> >>>
>> >>> I also tried doing this by converting the scala sample into Java but
>> i am
>> >>> getting an compile time error below is the code
>> >>>
>> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>> >>>
>> >>>         //Build the job configuration with ConfigHelper provided by
>> >>> Cassandra
>> >>>         Job job = null;
>> >>>         try {
>> >>>             job = new Job();
>> >>>         } catch (IOException e) {
>> >>>             e.printStackTrace();  //To change body of catch statement
>> use
>> >>> File | Settings | File Templates.
>> >>>         }
>> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>
>> >>>         String host = args[1];
>> >>>         String port = args[2];
>> >>>
>> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "Words");
>> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "WordCount");
>> >>>
>> >>>         SlicePredicate predicate = new SlicePredicate();
>> >>>         SliceRange sliceRange = new SliceRange();
>> >>>         sliceRange.setStart(new byte[0]);
>> >>>         sliceRange.setFinish(new byte[0]);
>> >>>         predicate.setSlice_range(sliceRange);
>> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>> predicate);
>> >>>
>> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>>
>> >>>         // Make a new Hadoop RDD
>> >>>         final SortedMap<ByteBuffer, IColumn>
>> byteBufferIColumnSortedMap =
>> >>> new TreeMap<ByteBuffer, IColumn>();
>> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class,
>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>> >>>
>> >>>
>> >>> i also tried the code segment that you have provided but i keep
>> getting
>> >>> the following error.
>> >>>
>> >>> java:
>> >>>
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> >>>
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> >>>
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> >>> of ? extends java.util.SortedMap>)
>> >>>
>> >>> Did you encounter this if so any help on this would be appreciated.
>> >>>
>> >>> Best Regards,
>> >>> Pulasthi
>> >>>
>> >>>
>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <tal.s@taboola.com>
>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>>
>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some
help.
>> >>>> I've translated the the scala example - CassandraTest.scala - to
>> Java, but I
>> >>>> keep getting the following exception:
>> >>>>
>> >>>> Exception in thread "main" java.io.IOException: Could not get input
>> >>>> splits
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>> >>>> at
>> >>>>
>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>> >>>> Caused by: java.util.concurrent.ExecutionException:
>> >>>> java.lang.RuntimeException:
>> org.apache.thrift.transport.TTransportException
>> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>> >>>> ... 9 more
>> >>>> Caused by: java.lang.RuntimeException:
>> >>>> org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>> at java.lang.Thread.run(Thread.java:695)
>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>> >>>> at
>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>> >>>> ... 8 more
>> >>>>
>> >>>>
>> >>>> This is the relevant code portion:
>> >>>>
>> >>>>             Job job = new Job();
>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>> String host = "<server>";
>> >>>> String port = "9160";
>> >>>>
>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>> >>>> "UserEvent", true);
>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>>
>> >>>>    SlicePredicate predicate = new SlicePredicate();
>> >>>>     SliceRange sliceRange = new SliceRange();
>> >>>>    sliceRange.setStart(new byte[0]);
>> >>>>    sliceRange.setFinish(new byte[0]);
>> >>>>    predicate.setSlice_range(sliceRange);
>> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>>> predicate);
>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>> >>>> IColumn>();
>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class ,
>> >>>> ByteBuffer.class, b.getClass());
>> >>>>
>> >>>>
>> >>>> I would appreciate any input you may have.
>> >>>>
>> >>>> Thanks!
>> >>>>
>> >>>> Tal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Pulasthi Supun
>> >>> Undergraduate
>> >>> Dpt of Computer Science & Engineering
>> >>> University of Moratuwa
>> >>> Blog : http://pulasthisupun.blogspot.com/
>> >>> Git hub profile: https://github.com/pulasthi
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Pulasthi Supun
>> > Undergraduate
>> > Dpt of Computer Science & Engineering
>> > University of Moratuwa
>> > Blog : http://pulasthisupun.blogspot.com/
>> > Git hub profile: https://github.com/pulasthi
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
> <https://github.com/pulasthi>
>

Mime
View raw message