spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arwin Tio <arwin....@hotmail.com>
Subject Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files
Date Mon, 23 Sep 2019 08:14:04 GMT
Hi Steve,

I filed a JIRA and opened a PR for this issue:

https://issues.apache.org/jira/browse/SPARK-29089
https://github.com/apache/spark/pull/25899

Please lmk what you think

Cheers,

Arwin
________________________________
From: Steve Loughran <stevel@cloudera.com>
Sent: September 7, 2019 9:22 AM
To: Arwin Tio <arwin.tio@hotmail.com>
Cc: Sean Owen <srowen@gmail.com>; dev@spark.apache.org <dev@spark.apache.org>
Subject: Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading
S3 files



On Fri, Sep 6, 2019 at 10:56 PM Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
I think the problem is calling globStatus to expand all 300K files.
In my particular case I did not use any glob patterns, so my bottleneck came from the FileSystem#exists
specifically. I do concur that the globStatus expansion could also be problematic.

But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.
That is a great solution! I think that's what I will do as a workaround for the moment. Right
now I'm thinking that a potential improvement here is to parallelize the SparkHadoopUtil#globPathIfNecessary
and FileSystem#exists calls whenever possible (i.e. when multiple paths are specified), so
that the client doesn't have to.


The other tactic though it'd go through a lot more of the code would be to postpone the exists
check until the work is scheduled, which is implicitly in open() on the workers, or explicit
when the RDD does the split calculation and calls getFileBlockLocations(). If you are confident
that that always happens (and you will have to trace back from those calls in things like
org.apache.spark.streaming.util.HdfsUtils and ParallelizedWithLocalityRDD) then you get those
scans in the driver ... but I fear regression handling there gets harder.

* have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore
exist, and those which it didn't glob for -it will only need to check those.
* then worry about parallel execution of the scan, again
Okay sounds good, I will take a crack at this and open a ticket. Any thoughts on the parallelism;
should it be configurable?

For file input formats (parquet, orc, ...) there is an option, default == 8. Though its also
off by default...maybe i should change that.


Another possible QoL improvement here is to show progress log messages - something that indicates
to the user that the cluster is stuck while the driver is listing S3 files, maybe even including
the FS getStorageStatistics?

yeah. If you want some examples of this, take a look at https://github.com/steveloughran/cloudstore
. the locatedfilestatus command replicates what happens during FileInputFormat scans, so is
how I'm going to tune IOPs there. It might also be good to have those bits of the hadoop MR
classes which spark uses to log internally @ debug, so everything gets this logging if they
ask for it.

Happy to take contribs there as Hadoop JIRAs & PRs

Thanks,

Arwin
________________________________
From: Steve Loughran <stevel@cloudera.com<mailto:stevel@cloudera.com>>
Sent: September 6, 2019 4:15 PM
To: Sean Owen <srowen@gmail.com<mailto:srowen@gmail.com>>
Cc: Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>; dev@spark.apache.org<mailto:dev@spark.apache.org>
<dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading
S3 files



On Fri, Sep 6, 2019 at 2:50 PM Sean Owen <srowen@gmail.com<mailto:srowen@gmail.com>>
wrote:
I think the problem is calling globStatus to expand all 300K files.
This is a general problem for object stores and huge numbers of files.
Steve L. may have better thoughts on real solutions. But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.

yeah, avoid globs and small files, especially small files in deep trees.

I think it's hard to optimize this case from the Spark side as it's
not clear how big a glob like s3://foo/* is going to be. I think it
would take reimplementing some logic to expand the glob incrementally
or something. Or maybe I am overlooking optimizations that have gone
into Spark 3.

A long time ago I actually tried to move Filesystem.globFiles off its own recursive treewalk
into supporting the option of flat-list-chlldren + filter. But while you can get some great
speedups in some layouts, you can get pathological collapses in perf elsewhere, which makes
the people running those queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in org.apache.hadoop.mapred.LocatedFileStatusFetcher
to see what it does there. I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465<https://issues.apache.org/jira/browse/HADOOP-16465>, which
should speed up ORC/Parquet scans) . These are designed to cut 1-2 HEAD requests off per directory
list, which may seem small but from my early measurements, can be significant.

That's why cutting things like an exists check makes a big difference, especially if you are
going to call some list() or open() operation straight after -just call the operation and
rely on the FileNotFoundException to tell you when it's not there.

Now, looking at the code, if the list has already come from a real call to globPath, then
yes, the existsCall is wasteful, where waste = 500+ mills per file: http://steveloughran.blogspot.com/2016/12/how-long-does-filesystemexists-take.html

For speedup then
* have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore
exist, and those which it didn't glob for -it will only need to check those.
* then worry about parallel execution of the scan, again

Why not file a JIRA on the spark work; send me a ref so I can look at your patch.

One thing to know here is that not only does the S3A FS class have counters for all operations
you can get from getStorageStatistics, if you call toString() on it it will print out the
current stats. So you can just log the fs string value before and after an operation and see
what's gone on. We track FS API calls (op_*) and actual http requests of the store (object_*);
both are interesting. object_ to see what is expensive (and in the S3A FS code, what we should
cut), the op_ values what API calls are used a lot and should somehow be eliminated or, if
you have insights, optimised better. Removal is usually the best, as it speeds up everything.

Long term, relying on directory trees to list your source data, commit algorithms which move/instantiate
changes isn't sustainable. Things like Apache Iceberg are where data should go ... things
for which S3 can be viewed as a fault-injecting test infrastructure. It's the Chaos Monkey
of object storage.


On Fri, Sep 6, 2019 at 7:09 AM Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
>
> Hello,
>
> On Spark 2.4.4, I am using DataFrameReader#csv to read about 300000 files on S3, and
I've noticed that it takes about an hour for it to load the data on the Driver. You can see
the timestamp difference when the log from InMemoryFileIndex occurs from 7:45 to 8:54:
>
> 19/09/06 07:44:42 INFO SparkContext: Running Spark version 2.4.4
> 19/09/06 07:44:42 INFO SparkContext: Submitted application: LoglineParquetGenerator
> ...
> 19/09/06 07:45:40 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> 19/09/06 08:54:57 INFO InMemoryFileIndex: Listing leaf files and directories in parallel
under: [300K files...]
>
>
> I believe that the issue comes from DataSource#checkAndGlobPathIfNecessary [0], specifically
from when it is calling FileSystem#exists. Unlike bulkListLeafFiles, the existence check here
happens in a single-threaded flatMap, which is a blocking network call if your files are stored
on S3.
>
> I believe that there is a fairly straightforward opportunity for improvement here, which
is to parallelize the existence check perhaps with a configurable number of threads. If that
seems reasonable, I would like to create a JIRA ticket and submit a patch. Please let me know!
>
> Cheers,
>
> Arwin
>
> [0] https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L557

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org<mailto:dev-unsubscribe@spark.apache.org>


Mime
View raw message