spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Distributing a FlatMap across a Spark Cluster
Date Wed, 09 Jun 2021 14:44:18 GMT
Hi Tom,

Persist() here simply means persist to memory). That is all. You can check
UI tab on storage

https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

So I gather the code is stuck from your link in the driver. You stated that
you tried repartition() but it did not do anything,

Further you stated :

" The key is pretty static in these tests, so I have also tried forcing the
partition count (50 on a 16 core per node cluster) and also repartitioning,
but every time all the jobs are scheduled to run on one node."


What is the key?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 9 Jun 2021 at 15:23, Tom Barber <tom@spicule.co.uk> wrote:

> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
> assume the .persist() at the end of that line doesn't do it?
>
> I believe, looking at the output in the SparkUI, it gets to
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
> and calls the context runJob.
>
> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <srowen@gmail.com> wrote:
>
>> All these configurations don't matter at all if this is executing on the
>> driver.
>> Returning an Iterator in flatMap is fine though it 'delays' execution
>> until that iterator is evaluated by something, which is normally fine.
>> Does creating this FairFetcher do anything by itself? you're just
>> returning an iterator that creates them here.
>> How do you actually trigger an action here? the code snippet itself
>> doesn't trigger anything.
>> I think we need more info about what else is happening in the code.
>>
>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <tom@spicule.co.uk> wrote:
>>
>>> Yeah so if I update the FairFetcher to return a seq it makes no real
>>> difference.
>>>
>>> Here's an image of what I'm seeing just for reference:
>>> https://pasteboard.co/K5NFrz7.png
>>>
>>> Because this is databricks I don't have an actual spark submit command
>>> but it looks like this:
>>>
>>> curl xxxx -d
>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>> "spark.task.cpus":"16"},
>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
>>> "--executor-memory", "10g",
>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>> "-tn", "5000", "-co",
>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>
>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>>> driver trying to run all the tasks in parallel on the one node, but again
>>> I've got 50 tasks queued up all running on the single node.
>>>
>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <tom@spicule.co.uk> wrote:
>>>
>>>> I've not run it yet, but I've stuck a toSeq on the end, but in reality
>>>> a Seq just inherits Iterator, right?
>>>>
>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>>>
>>>> Tom
>>>>
>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <tom@spicule.co.uk> wrote:
>>>>
>>>>> Interesting Jayesh, thanks, I will test.
>>>>>
>>>>> All this code is inherited and it runs, but I don't think its been
>>>>> tested in a distributed context for about 5 years, but yeah I need to
get
>>>>> this pushed down, so I'm happy to try anything! :)
>>>>>
>>>>> Tom
>>>>>
>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <jlalwani@amazon.com>
>>>>> wrote:
>>>>>
>>>>>> flatMap is supposed to return Seq, not Iterator. You are returning
a
>>>>>> class that implements Iterator. I have a hunch that's what's causing
the
>>>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData].
Do
>>>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq
on
>>>>>> FairFetcher.
>>>>>>
>>>>>> ´╗┐On 6/8/21, 10:10 PM, "Tom Barber" <magicaltrout@apache.org>
wrote:
>>>>>>
>>>>>>     CAUTION: This email originated from outside of the organization.
>>>>>> Do not click links or open attachments unless you can confirm the
sender
>>>>>> and know the content is safe.
>>>>>>
>>>>>>
>>>>>>
>>>>>>     For anyone interested here's the execution logs up until the
>>>>>> point where it actually kicks off the workload in question:
>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>>>>
>>>>>>     On 2021/06/09 01:52:39, Tom Barber <magicaltrout@apache.org>
>>>>>> wrote:
>>>>>>     > ExecutorID says driver, and looking at the IP addresses
its
>>>>>> running on its not any of the worker ip's.
>>>>>>     >
>>>>>>     > I forcibly told it to create 50, but they'd all end up running
>>>>>> in the same place.
>>>>>>     >
>>>>>>     > Working on some other ideas, I set spark.task.cpus to 16
to
>>>>>> match the nodes whilst still forcing it to 50 partitions
>>>>>>     >
>>>>>>     > val m = 50
>>>>>>     >
>>>>>>     > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>     >         .groupByKey(m).flatMap({ case (grp, rs) => new
>>>>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>>>>>     >           FetchFunction, ParseFunction, OutLinkFilterFunction,
>>>>>> StatusUpdateSolrTransformer) })
>>>>>>     >         .persist()
>>>>>>     >
>>>>>>     > that sort of thing. But still the tasks are pinned to the
>>>>>> driver executor and none of the workers, so I no longer saturate
the master
>>>>>> node, but I also have 3 workers just sat there doing nothing.
>>>>>>     >
>>>>>>     > On 2021/06/09 01:26:50, Sean Owen <srowen@gmail.com>
wrote:
>>>>>>     > > Are you sure it's on the driver? or just 1 executor?
>>>>>>     > > how many partitions does the groupByKey produce? that
would
>>>>>> limit your
>>>>>>     > > parallelism no matter what if it's a small number.
>>>>>>     > >
>>>>>>     > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>>>>>> magicaltrout@apache.org> wrote:
>>>>>>     > >
>>>>>>     > > > Hi folks,
>>>>>>     > > >
>>>>>>     > > > Hopefully someone with more Spark experience than
me can
>>>>>> explain this a
>>>>>>     > > > bit.
>>>>>>     > > >
>>>>>>     > > > I dont' know if this is possible, impossible or
just an old
>>>>>> design that
>>>>>>     > > > could be better.
>>>>>>     > > >
>>>>>>     > > > I'm running Sparkler as a spark-submit job on
a databricks
>>>>>> spark cluster
>>>>>>     > > > and its getting to this point in the code(
>>>>>>     > > >
>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>>>>>>     > > > )
>>>>>>     > > >
>>>>>>     > > > val fetchedRdd = rdd.map(r => (r.getGroup,
r))
>>>>>>     > > >         .groupByKey()
>>>>>>     > > >         .flatMap({ case (grp, rs) => new FairFetcher(job,
>>>>>> rs.iterator,
>>>>>>     > > > localFetchDelay,
>>>>>>     > > >           FetchFunction, ParseFunction,
>>>>>> OutLinkFilterFunction,
>>>>>>     > > > StatusUpdateSolrTransformer) })
>>>>>>     > > >         .persist()
>>>>>>     > > >
>>>>>>     > > > This basically takes the RDD and then runs a web
based
>>>>>> crawl over each RDD
>>>>>>     > > > and returns the results. But when Spark executes
it, it
>>>>>> runs all the crawls
>>>>>>     > > > on the driver node and doesn't distribute them.
>>>>>>     > > >
>>>>>>     > > > The key is pretty static in these tests, so I
have also
>>>>>> tried forcing the
>>>>>>     > > > partition count (50 on a 16 core per node cluster)
and also
>>>>>> repartitioning,
>>>>>>     > > > but every time all the jobs are scheduled to run
on one
>>>>>> node.
>>>>>>     > > >
>>>>>>     > > > What can I do better to distribute the tasks?
Because the
>>>>>> processing of
>>>>>>     > > > the data in the RDD isn't the bottleneck, the
fetching of
>>>>>> the crawl data is
>>>>>>     > > > the bottleneck, but that happens after the code
has been
>>>>>> assigned to a node.
>>>>>>     > > >
>>>>>>     > > > Thanks
>>>>>>     > > >
>>>>>>     > > > Tom
>>>>>>     > > >
>>>>>>     > > >
>>>>>>     > > >
>>>>>> ---------------------------------------------------------------------
>>>>>>     > > > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>     > > >
>>>>>>     > > >
>>>>>>     > >
>>>>>>     >
>>>>>>     >
>>>>>> ---------------------------------------------------------------------
>>>>>>     > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>     >
>>>>>>     >
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>>     To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>
>>>>>>
>>> Spicule Limited is registered in England & Wales. Company Number:
>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>
>>>
>>> All engagements are subject to Spicule Terms and Conditions of Business.
>>> This email and its contents are intended solely for the individual to whom
>>> it is addressed and may contain information that is confidential,
>>> privileged or otherwise protected from disclosure, distributing or copying.
>>> Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of Spicule Limited. The
>>> company accepts no liability for any damage caused by any virus transmitted
>>> by this email. If you have received this message in error, please notify us
>>> immediately by reply email before deleting it from your system. Service of
>>> legal notice cannot be effected on Spicule Limited by email.
>>>
>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>

Mime
View raw message