spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Martin <ch...@cmartinit.co.uk>
Subject Re: Distributing a FlatMap across a Spark Cluster
Date Wed, 09 Jun 2021 16:13:34 GMT
One thing I would check is this line:

val fetchedRdd = rdd.map(r => (r.getGroup, r))

how many distinct groups do you ended up with?  If there's just one then I
think you might see the behaviour you observe.

Chris


On Wed, Jun 9, 2021 at 4:17 PM Tom Barber <tom@spicule.co.uk> wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.repartition(50).map(d =>
ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 😞
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber <tom@spicule.co.uk> wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
localFetchDelay,
>>     FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer).toSeq
})
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> ....
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen <srowen@gmail.com> wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <tom@spicule.co.uk> wrote:
>>>
>>>> Thanks Mich,
>>>>
>>>> The key on the first iteration is just a string that says "seed", so it
>>>> is indeed on the first crawl the same across all of the groups. Further
>>>> iterations would be different, but I'm not there yet. I was under the
>>>> impression that a repartition would distribute the tasks. Is that not the
>>>> case?
>>>>
>>>> Thanks
>>>>
>>>> Tom
>>>>
>>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi Tom,
>>>>>
>>>>> Persist() here simply means persist to memory). That is all. You can
>>>>> check UI tab on storage
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>>>>
>>>>> So I gather the code is stuck from your link in the driver. You stated
>>>>> that you tried repartition() but it did not do anything,
>>>>>
>>>>> Further you stated :
>>>>>
>>>>> " The key is pretty static in these tests, so I have also tried
>>>>> forcing the partition count (50 on a 16 core per node cluster) and also
>>>>> repartitioning, but every time all the jobs are scheduled to run on one
>>>>> node."
>>>>>
>>>>>
>>>>> What is the key?
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber <tom@spicule.co.uk> wrote:
>>>>>
>>>>>> Interesting Sean thanks for that insight, I wasn't aware of that
>>>>>> fact, I assume the .persist() at the end of that line doesn't do
it?
>>>>>>
>>>>>> I believe, looking at the output in the SparkUI, it gets to
>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>>>>>> and calls the context runJob.
>>>>>>
>>>>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <srowen@gmail.com>
wrote:
>>>>>>
>>>>>>> All these configurations don't matter at all if this is executing
on
>>>>>>> the driver.
>>>>>>> Returning an Iterator in flatMap is fine though it 'delays'
>>>>>>> execution until that iterator is evaluated by something, which
is normally
>>>>>>> fine.
>>>>>>> Does creating this FairFetcher do anything by itself? you're
just
>>>>>>> returning an iterator that creates them here.
>>>>>>> How do you actually trigger an action here? the code snippet
itself
>>>>>>> doesn't trigger anything.
>>>>>>> I think we need more info about what else is happening in the
code.
>>>>>>>
>>>>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <tom@spicule.co.uk>
wrote:
>>>>>>>
>>>>>>>> Yeah so if I update the FairFetcher to return a seq it makes
no
>>>>>>>> real difference.
>>>>>>>>
>>>>>>>> Here's an image of what I'm seeing just for reference:
>>>>>>>> https://pasteboard.co/K5NFrz7.png
>>>>>>>>
>>>>>>>> Because this is databricks I don't have an actual spark submit
>>>>>>>> command but it looks like this:
>>>>>>>>
>>>>>>>> curl xxxx -d
>>>>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>>>>>> "spark.task.cpus":"16"},
>>>>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>>>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory",
"10g",
>>>>>>>> "--executor-memory", "10g",
>>>>>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>>>>>>> "-tn", "5000", "-co",
>>>>>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>>>>>>
>>>>>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping
the
>>>>>>>> driver trying to run all the tasks in parallel on the one
node, but again
>>>>>>>> I've got 50 tasks queued up all running on the single node.
>>>>>>>>
>>>>>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <tom@spicule.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've not run it yet, but I've stuck a toSeq on the end,
but in
>>>>>>>>> reality a Seq just inherits Iterator, right?
>>>>>>>>>
>>>>>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is
lying to me.
>>>>>>>>>
>>>>>>>>> Tom
>>>>>>>>>
>>>>>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <tom@spicule.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Interesting Jayesh, thanks, I will test.
>>>>>>>>>>
>>>>>>>>>> All this code is inherited and it runs, but I don't
think its
>>>>>>>>>> been tested in a distributed context for about 5
years, but yeah I need to
>>>>>>>>>> get this pushed down, so I'm happy to try anything!
:)
>>>>>>>>>>
>>>>>>>>>> Tom
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <
>>>>>>>>>> jlalwani@amazon.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> flatMap is supposed to return Seq, not Iterator.
You are
>>>>>>>>>>> returning a class that implements Iterator. I
have a hunch that's what's
>>>>>>>>>>> causing the confusion. flatMap is returning a
RDD[FairFetcher] not
>>>>>>>>>>> RDD[CrawlData]. Do you intend it to be RDD[CrawlData]?
You might want to
>>>>>>>>>>> call toSeq on FairFetcher.
>>>>>>>>>>>
>>>>>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <magicaltrout@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>     CAUTION: This email originated from outside
of the
>>>>>>>>>>> organization. Do not click links or open attachments
unless you can confirm
>>>>>>>>>>> the sender and know the content is safe.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     For anyone interested here's the execution
logs up until the
>>>>>>>>>>> point where it actually kicks off the workload
in question:
>>>>>>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>>>>>>>>>
>>>>>>>>>>>     On 2021/06/09 01:52:39, Tom Barber <magicaltrout@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>     > ExecutorID says driver, and looking
at the IP addresses
>>>>>>>>>>> its running on its not any of the worker ip's.
>>>>>>>>>>>     >
>>>>>>>>>>>     > I forcibly told it to create 50, but
they'd all end up
>>>>>>>>>>> running in the same place.
>>>>>>>>>>>     >
>>>>>>>>>>>     > Working on some other ideas, I set spark.task.cpus
to 16
>>>>>>>>>>> to match the nodes whilst still forcing it to
50 partitions
>>>>>>>>>>>     >
>>>>>>>>>>>     > val m = 50
>>>>>>>>>>>     >
>>>>>>>>>>>     > val fetchedRdd = rdd.map(r => (r.getGroup,
r))
>>>>>>>>>>>     >         .groupByKey(m).flatMap({ case
(grp, rs) => new
>>>>>>>>>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>>>>>>>>>>     >           FetchFunction, ParseFunction,
>>>>>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer)
})
>>>>>>>>>>>     >         .persist()
>>>>>>>>>>>     >
>>>>>>>>>>>     > that sort of thing. But still the tasks
are pinned to the
>>>>>>>>>>> driver executor and none of the workers, so I
no longer saturate the master
>>>>>>>>>>> node, but I also have 3 workers just sat there
doing nothing.
>>>>>>>>>>>     >
>>>>>>>>>>>     > On 2021/06/09 01:26:50, Sean Owen <srowen@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>     > > Are you sure it's on the driver?
or just 1 executor?
>>>>>>>>>>>     > > how many partitions does the groupByKey
produce? that
>>>>>>>>>>> would limit your
>>>>>>>>>>>     > > parallelism no matter what if it's
a small number.
>>>>>>>>>>>     > >
>>>>>>>>>>>     > > On Tue, Jun 8, 2021 at 8:07 PM
Tom Barber <
>>>>>>>>>>> magicaltrout@apache.org> wrote:
>>>>>>>>>>>     > >
>>>>>>>>>>>     > > > Hi folks,
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Hopefully someone with more
Spark experience than me
>>>>>>>>>>> can explain this a
>>>>>>>>>>>     > > > bit.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > I dont' know if this is possible,
impossible or just
>>>>>>>>>>> an old design that
>>>>>>>>>>>     > > > could be better.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > I'm running Sparkler as a
spark-submit job on a
>>>>>>>>>>> databricks spark cluster
>>>>>>>>>>>     > > > and its getting to this point
in the code(
>>>>>>>>>>>     > > >
>>>>>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>>>>>>>>>>>     > > > )
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > val fetchedRdd = rdd.map(r
=> (r.getGroup, r))
>>>>>>>>>>>     > > >         .groupByKey()
>>>>>>>>>>>     > > >         .flatMap({ case (grp,
rs) => new
>>>>>>>>>>> FairFetcher(job, rs.iterator,
>>>>>>>>>>>     > > > localFetchDelay,
>>>>>>>>>>>     > > >           FetchFunction, ParseFunction,
>>>>>>>>>>> OutLinkFilterFunction,
>>>>>>>>>>>     > > > StatusUpdateSolrTransformer)
})
>>>>>>>>>>>     > > >         .persist()
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > This basically takes the RDD
and then runs a web based
>>>>>>>>>>> crawl over each RDD
>>>>>>>>>>>     > > > and returns the results. But
when Spark executes it,
>>>>>>>>>>> it runs all the crawls
>>>>>>>>>>>     > > > on the driver node and doesn't
distribute them.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > The key is pretty static in
these tests, so I have
>>>>>>>>>>> also tried forcing the
>>>>>>>>>>>     > > > partition count (50 on a 16
core per node cluster) and
>>>>>>>>>>> also repartitioning,
>>>>>>>>>>>     > > > but every time all the jobs
are scheduled to run on
>>>>>>>>>>> one node.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > What can I do better to distribute
the tasks? Because
>>>>>>>>>>> the processing of
>>>>>>>>>>>     > > > the data in the RDD isn't
the bottleneck, the fetching
>>>>>>>>>>> of the crawl data is
>>>>>>>>>>>     > > > the bottleneck, but that happens
after the code has
>>>>>>>>>>> been assigned to a node.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Thanks
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Tom
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     > > > To unsubscribe e-mail:
>>>>>>>>>>> user-unsubscribe@spark.apache.org
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > >
>>>>>>>>>>>     >
>>>>>>>>>>>     >
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>     >
>>>>>>>>>>>     >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>> Spicule Limited is registered in England & Wales. Company
Number:
>>>>>>>> 09954122. Registered office: First Floor, Telecom House,
125-135 Preston
>>>>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>>>
>>>>>>>>
>>>>>>>> All engagements are subject to Spicule Terms and Conditions
of
>>>>>>>> Business. This email and its contents are intended solely
for the
>>>>>>>> individual to whom it is addressed and may contain information
that is
>>>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>>>> distributing or copying. Any views or opinions presented
in this email are
>>>>>>>> solely those of the author and do not necessarily represent
those of
>>>>>>>> Spicule Limited. The company accepts no liability for any
damage caused by
>>>>>>>> any virus transmitted by this email. If you have received
this message in
>>>>>>>> error, please notify us immediately by reply email before
deleting it from
>>>>>>>> your system. Service of legal notice cannot be effected on
Spicule Limited
>>>>>>>> by email.
>>>>>>>>
>>>>>>>
>>>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135
Preston
>>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>
>>>>>>
>>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>>> Business. This email and its contents are intended solely for the
>>>>>> individual to whom it is addressed and may contain information that
is
>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>> distributing or copying. Any views or opinions presented in this
email are
>>>>>> solely those of the author and do not necessarily represent those
of
>>>>>> Spicule Limited. The company accepts no liability for any damage
caused by
>>>>>> any virus transmitted by this email. If you have received this message
in
>>>>>> error, please notify us immediately by reply email before deleting
it from
>>>>>> your system. Service of legal notice cannot be effected on Spicule
Limited
>>>>>> by email.
>>>>>>
>>>>>
>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>
>>>>
>>>> All engagements are subject to Spicule Terms and Conditions of
>>>> Business. This email and its contents are intended solely for the
>>>> individual to whom it is addressed and may contain information that is
>>>> confidential, privileged or otherwise protected from disclosure,
>>>> distributing or copying. Any views or opinions presented in this email are
>>>> solely those of the author and do not necessarily represent those of
>>>> Spicule Limited. The company accepts no liability for any damage caused by
>>>> any virus transmitted by this email. If you have received this message in
>>>> error, please notify us immediately by reply email before deleting it from
>>>> your system. Service of legal notice cannot be effected on Spicule Limited
>>>> by email.
>>>>
>>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>

Mime
View raw message