spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Do I need to do .collect inside forEachRDD
Date Tue, 05 Dec 2017 21:54:54 GMT
The general answer to your initial question is that "it depends". If the
operation in the rdd.foreach() closure can be parallelized, then you don't
need to collect first. If it needs some local context (e.g. a socket
connection), then you need to do rdd.collect first to bring the data
locally, which has a perf penalty and also is restricted to the memory size
to the driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you can pave your way to
migrate to structured streaming by already adopting the 'structured' APIs
within Spark Streaming:

case class KV(key: String, value: String)

dstream.map().reduce().forEachRdd{rdd ->
    import spark.implicits._
    val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
be in a (key,value) shape
    val dataFrame = rdd.toDF()
    dataFrame.write
                     .format("kafka")
                     .option("kafka.bootstrap.servers",
"host1:port1,host2:port2")
                     .option("topic", "topic1")
                     .save()
}

-kr, Gerard.



On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth909@gmail.com> wrote:

> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <Richard.Qiao@capitalone.com
> > wrote:
>
>> Where do you check the output result for both case?
>>
>> Sent from my iPhone
>>
>>
>> > On Dec 5, 2017, at 15:36, kant kodali <kanth909@gmail.com> wrote:
>> >
>> > Hi All,
>> >
>> > I have a simple stateless transformation using Dstreams (stuck with the
>> old API for one of the Application). The pseudo code is rough like this
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >      rdd.collect(),forEach(); // Is this necessary ? Does execute fine
>> but a bit slow
>> > })
>> >
>> > I understand collect collects the results back to the driver but is
>> that necessary? can I just do something like below? I believe I tried both
>> and somehow the below code didn't output any results (It can be issues with
>> my env. I am not entirely sure) but I just would like some clarification on
>> .collect() since it seems to slow things down for me.
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >      rdd.forEach(() -> {} ); //
>> > })
>> >
>> > Thanks!
>> >
>> >
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>

Mime
View raw message