spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: how to send JavaDStream RDD using foreachRDD using Java
Date Mon, 02 Feb 2015 19:46:18 GMT
Hello Sachin,

While Akhil's solution is correct, this is not sufficient for your usecase.
RDD.foreach (that Akhil is using) will run on the workers, but you are
creating the Producer object on the driver. This will not work, a producer
create on the driver cannot be used from the worker/executor. The best way
to do what you want to do is to use rdd.foreachPartition. Inside the
function supplied to RDD.foreachPartition, create the producer, send the
whole partition, and close the producer. Am an phone so I am not able to
generate Java code.

TD

On Mon, Feb 2, 2015 at 11:38 AM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> Here you go:
>
>             JavaDStream<String> textStream =
> ssc.textFileStream("/home/akhld/sigmoid/");
>
>     textStream.foreachRDD(new Function<JavaRDD<String>,Void>() {
>
> @Override
> public Void call(JavaRDD<String> rdd) throws Exception {
> // TODO Auto-generated method stub
> rdd.foreach(new VoidFunction<String>(){
>
> @Override
> public void call(String stringData) throws Exception {
> // Use this data!
> System.out.println("W00t!! Data :" + stringData);
> }
>  });
>  return null;
> }
>           });
>
> Thanks
> Best Regards
>
> On Sun, Feb 1, 2015 at 9:08 PM, sachin Singh <sachin.shashi@gmail.com>
> wrote:
>
>> Hi I want to send streaming data to kafka topic,
>> I am having RDD data which I converted in JavaDStream ,now I want to send
>> it
>> to kafka topic, I don't want kafka sending code, just I need foreachRDD
>> implementation, my code is look like as
>> public void publishtoKafka(ITblStream t)
>>     {
>>         MyTopicProducer MTP =
>> ProducerFactory.createProducer(hostname+":"+port);
>>         JavaDStream<?> rdd = (JavaDStream<?>) t.getRDD();
>>
>>         rdd.foreachRDD(new Function<String, String>() {
>>             @Override
>>             public Void call(JavaRDD<String> rdd) throws Exception {
>>              KafkaUtils.sendDataAsString(MTP,topicName, "String RDDData");
>>             return null;
>>             }
>>           });
>>         log.debug("------------------------sent to kafka:
>> ------------------");
>>
>>     }
>>
>> here myTopicproducer will create producer which is working fine
>> KafkaUtils.sendDataAsString is method which will publish data to kafka
>> topic
>> is also working fine,
>>
>> I have only one problem I am not able to convert JavaDStream rdd as string
>> using foreach or foreachRDD finally I need String message from rdds,
>> kindly
>> suggest java code only and I dont want to use anonymous classes, Please
>> send
>> me only the part to send JavaDStream RDD using foreachRDD using Function
>> Call
>>
>> Thanks in advance,
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-send-JavaDStream-RDD-using-foreachRDD-using-Java-tp21456.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message