spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pulasthi Supun Wickramasinghe <pulasthi...@gmail.com>
Subject Re: Using Cassandra as an input stream from Java
Date Thu, 05 Dec 2013 08:08:45 GMT
Hi Lucas,

That did the trick just had to change JavaPairRDD<ByteBuffer,
SortedMap<ByteBuffer, IColumn>> to JavaPairRDD<ByteBuffer,* ? extends
* SortedMap<ByteBuffer,
IColumn>> thanks for the help.

Regards,
Pulasthi



On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti <
lbrunialti@igcorp.com.br> wrote:

> Hi all,
>
> This should work:
>
> JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
> context.newAPIHadoopRDD(job.getConfiguration(),
>
> ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
> ByteBuffer.class, SortedMap.class);
>
> I have translated the word count written in scala to java, i just can't
> send it right now...
>
> Best Regards.
>
> Lucas.
> On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
> pulasthi911@gmail.com> wrote:
>
>> Hi Tal,
>>
>> Just checking if you have added your code to github :). if you have could
>> you point me to it.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pwendell@gmail.com>wrote:
>>
>>> Tal - that would be great to have open sourced if you can do it!
>>>
>>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>>> <pulasthi911@gmail.com> wrote:
>>> > Hi Tal,
>>> >
>>> > Thanks for the info will try it out and see how it goes.
>>> >
>>> >
>>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <tal.s@taboola.com>
>>> wrote:
>>> >>
>>> >> Hi Pulasthi,
>>> >>
>>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>>> another
>>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>>> extends
>>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>>> from
>>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>>> great!
>>> >> I'm cleaning up the code a bit and will upload to github as an open
>>> source
>>> >> (after the summit).
>>> >>
>>> > That's great looking forward check it out after you publish on github
>>> :).
>>> >
>>> >
>>> > Thanks,
>>> > Pulasthi
>>> >>
>>> >> I hope this helps for now,
>>> >>
>>> >> Tal
>>> >>
>>> >>
>>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>>> >> <pulasthi911@gmail.com> wrote:
>>> >>>
>>> >>> Hi Tal,
>>> >>>
>>> >>> I also tried doing this by converting the scala sample into Java
but
>>> i am
>>> >>> getting an compile time error below is the code
>>> >>>
>>> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>> >>>
>>> >>>         //Build the job configuration with ConfigHelper provided
by
>>> >>> Cassandra
>>> >>>         Job job = null;
>>> >>>         try {
>>> >>>             job = new Job();
>>> >>>         } catch (IOException e) {
>>> >>>             e.printStackTrace();  //To change body of catch
>>> statement use
>>> >>> File | Settings | File Templates.
>>> >>>         }
>>> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>
>>> >>>         String host = args[1];
>>> >>>         String port = args[2];
>>> >>>
>>> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "Words");
>>> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "WordCount");
>>> >>>
>>> >>>         SlicePredicate predicate = new SlicePredicate();
>>> >>>         SliceRange sliceRange = new SliceRange();
>>> >>>         sliceRange.setStart(new byte[0]);
>>> >>>         sliceRange.setFinish(new byte[0]);
>>> >>>         predicate.setSlice_range(sliceRange);
>>> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>> predicate);
>>> >>>
>>> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>>
>>> >>>         // Make a new Hadoop RDD
>>> >>>         final SortedMap<ByteBuffer, IColumn>
>>> byteBufferIColumnSortedMap =
>>> >>> new TreeMap<ByteBuffer, IColumn>();
>>> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd
=
>>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class,
>>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>> >>>
>>> >>>
>>> >>> i also tried the code segment that you have provided but i keep
>>> getting
>>> >>> the following error.
>>> >>>
>>> >>> java:
>>> >>>
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> >>>
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied
to
>>> >>>
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> >>> of ? extends java.util.SortedMap>)
>>> >>>
>>> >>> Did you encounter this if so any help on this would be appreciated.
>>> >>>
>>> >>> Best Regards,
>>> >>> Pulasthi
>>> >>>
>>> >>>
>>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <tal.s@taboola.com>
>>> wrote:
>>> >>>>
>>> >>>> Hi,
>>> >>>>
>>> >>>>
>>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some
>>> help.
>>> >>>> I've translated the the scala example - CassandraTest.scala
- to
>>> Java, but I
>>> >>>> keep getting the following exception:
>>> >>>>
>>> >>>> Exception in thread "main" java.io.IOException: Could not get
input
>>> >>>> splits
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> >>>> Caused by: java.util.concurrent.ExecutionException:
>>> >>>> java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> >>>> ... 9 more
>>> >>>> Caused by: java.lang.RuntimeException:
>>> >>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> >>>> at java.lang.Thread.run(Thread.java:695)
>>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>> >>>> at
>>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>> >>>> ... 8 more
>>> >>>>
>>> >>>>
>>> >>>> This is the relevant code portion:
>>> >>>>
>>> >>>>             Job job = new Job();
>>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>> String host = "<server>";
>>> >>>> String port = "9160";
>>> >>>>
>>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(),
host);
>>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
host);
>>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> >>>> "UserEvent", true);
>>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>>
>>> >>>>    SlicePredicate predicate = new SlicePredicate();
>>> >>>>     SliceRange sliceRange = new SliceRange();
>>> >>>>    sliceRange.setStart(new byte[0]);
>>> >>>>    sliceRange.setFinish(new byte[0]);
>>> >>>>    predicate.setSlice_range(sliceRange);
>>> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>>> predicate);
>>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> >>>> IColumn>();
>>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class ,
>>> >>>> ByteBuffer.class, b.getClass());
>>> >>>>
>>> >>>>
>>> >>>> I would appreciate any input you may have.
>>> >>>>
>>> >>>> Thanks!
>>> >>>>
>>> >>>> Tal
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Pulasthi Supun
>>> >>> Undergraduate
>>> >>> Dpt of Computer Science & Engineering
>>> >>> University of Moratuwa
>>> >>> Blog : http://pulasthisupun.blogspot.com/
>>> >>> Git hub profile: https://github.com/pulasthi
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Pulasthi Supun
>>> > Undergraduate
>>> > Dpt of Computer Science & Engineering
>>> > University of Moratuwa
>>> > Blog : http://pulasthisupun.blogspot.com/
>>> > Git hub profile: https://github.com/pulasthi
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>


-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Mime
View raw message