spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pulasthi Supun Wickramasinghe <pulasthi...@gmail.com>
Subject Re: Using Cassandra as an input stream from Java
Date Thu, 05 Dec 2013 03:50:52 GMT
Hi Tal,

Just checking if you have added your code to github :). if you have could
you point me to it.

Best Regards,
Pulasthi


On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pwendell@gmail.com>wrote:

> Tal - that would be great to have open sourced if you can do it!
>
> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
> <pulasthi911@gmail.com> wrote:
> > Hi Tal,
> >
> > Thanks for the info will try it out and see how it goes.
> >
> >
> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <tal.s@taboola.com>
> wrote:
> >>
> >> Hi Pulasthi,
> >>
> >> I couldn't make it work, so what I ended up doing was implement 3 Java
> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another
> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends
> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great!
> >> I'm cleaning up the code a bit and will upload to github as an open
> source
> >> (after the summit).
> >>
> > That's great looking forward check it out after you publish on github :).
> >
> >
> > Thanks,
> > Pulasthi
> >>
> >> I hope this helps for now,
> >>
> >> Tal
> >>
> >>
> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
> >> <pulasthi911@gmail.com> wrote:
> >>>
> >>> Hi Tal,
> >>>
> >>> I also tried doing this by converting the scala sample into Java but i
> am
> >>> getting an compile time error below is the code
> >>>
> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
> >>>
> >>>         //Build the job configuration with ConfigHelper provided by
> >>> Cassandra
> >>>         Job job = null;
> >>>         try {
> >>>             job = new Job();
> >>>         } catch (IOException e) {
> >>>             e.printStackTrace();  //To change body of catch statement
> use
> >>> File | Settings | File Templates.
> >>>         }
> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>
> >>>         String host = args[1];
> >>>         String port = args[2];
> >>>
> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
> >>> host);
> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> >>> host);
> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "Words");
> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "WordCount");
> >>>
> >>>         SlicePredicate predicate = new SlicePredicate();
> >>>         SliceRange sliceRange = new SliceRange();
> >>>         sliceRange.setStart(new byte[0]);
> >>>         sliceRange.setFinish(new byte[0]);
> >>>         predicate.setSlice_range(sliceRange);
> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>> predicate);
> >>>
> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>>
> >>>         // Make a new Hadoop RDD
> >>>         final SortedMap<ByteBuffer, IColumn>
> byteBufferIColumnSortedMap =
> >>> new TreeMap<ByteBuffer, IColumn>();
> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class,
> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
> >>>
> >>>
> >>> i also tried the code segment that you have provided but i keep getting
> >>> the following error.
> >>>
> >>> java:
> >>>
> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
> >>>
> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
> >>>
> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
> >>> of ? extends java.util.SortedMap>)
> >>>
> >>> Did you encounter this if so any help on this would be appreciated.
> >>>
> >>> Best Regards,
> >>> Pulasthi
> >>>
> >>>
> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <tal.s@taboola.com>
> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>>
> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
> >>>> I've translated the the scala example - CassandraTest.scala - to
> Java, but I
> >>>> keep getting the following exception:
> >>>>
> >>>> Exception in thread "main" java.io.IOException: Could not get input
> >>>> splits
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
> >>>> at
> >>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> >>>> at
> >>>>
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
> >>>> at
> >>>>
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
> >>>> Caused by: java.util.concurrent.ExecutionException:
> >>>> java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> >>>> ... 9 more
> >>>> Caused by: java.lang.RuntimeException:
> >>>> org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>>> at java.lang.Thread.run(Thread.java:695)
> >>>> Caused by: org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
> >>>> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
> >>>> ... 8 more
> >>>>
> >>>>
> >>>> This is the relevant code portion:
> >>>>
> >>>>             Job job = new Job();
> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>> String host = "<server>";
> >>>> String port = "9160";
> >>>>
> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> >>>> "UserEvent", true);
> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>>
> >>>>    SlicePredicate predicate = new SlicePredicate();
> >>>>     SliceRange sliceRange = new SliceRange();
> >>>>    sliceRange.setStart(new byte[0]);
> >>>>    sliceRange.setFinish(new byte[0]);
> >>>>    predicate.setSlice_range(sliceRange);
> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>>> predicate);
> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> >>>> IColumn>();
> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class ,
> >>>> ByteBuffer.class, b.getClass());
> >>>>
> >>>>
> >>>> I would appreciate any input you may have.
> >>>>
> >>>> Thanks!
> >>>>
> >>>> Tal
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Pulasthi Supun
> >>> Undergraduate
> >>> Dpt of Computer Science & Engineering
> >>> University of Moratuwa
> >>> Blog : http://pulasthisupun.blogspot.com/
> >>> Git hub profile: https://github.com/pulasthi
> >>
> >>
> >
> >
> >
> > --
> > Pulasthi Supun
> > Undergraduate
> > Dpt of Computer Science & Engineering
> > University of Moratuwa
> > Blog : http://pulasthisupun.blogspot.com/
> > Git hub profile: https://github.com/pulasthi
>



-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Mime
View raw message