spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <ste...@hortonworks.com>
Subject Re: More instances = slower Spark job
Date Sun, 01 Oct 2017 23:23:30 GMT

On 28 Sep 2017, at 15:27, Daniel Siegmann <dsiegmann@securityscorecard.io<mailto:dsiegmann@securityscorecard.io>>
wrote:


Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text file? Does it
use InputFormat do create multiple splits and creates 1 partition per split? Also, in case
of S3 or NFS, how does the input split work? I understand for HDFS files are already pre-split
so Spark can use dfs.blocksize to determine partitions. But how does it work other than HDFS?

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides to split for the
local filesystem. But it does. Maybe someone else will be able to explain the details.


HDFS files are split into blocks, each with a real block size and location, which is that
created when the file was written/copied.

 If you have a 100 MB replicated on 3 machines with a block size of 64MB, you will have two
blocks for the file: 64 and 36, with three replicas of each block. Blocks are placed across
machines (normally, 2 hosts on one rack, 1 on on a different rack....gives you better resilience
to failures of rack switches). There's no attempt to colocate blocks of the same file, *except*
that HDFS will attempt to write every block onto the host where the program generating the
data is running. So, space permitting, if the 100MB file is created on host 1, then host 1
will have block-1 replica-1, and block-2-replica-1, with the others scattered around the cluster.

The code is actually

https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L386


Because it's fixed in HDFS, you get the block size used at creation time; different formats
may provide their own split information independent of that block size though. (This also
means if you different block sizes for different files in the set of files you process, there
may be different splits for each file, as well as different locations.

With HDFS replication, you get the bandwidth of all the hard disks serving up data. With a
100 MB file split in two, if those blocks were actually saved onto different physical Hard
disks (say SAS disks with 6 gb/s), then you have 3 x 2 x 6 gb/s bandwidth, for a max of 24
gb/s. (of course, there's the other work competing for disk IO); that's maximum. If spark
schedules the work on those machines and you have the Hadoop native libraries installed (i.e.
you don't get told off in the logs for not having them), then the HDFS client running in the
spark processes can talk direct to the HDFS datanode and get give a native OS file handle
to read those blocks: there isn't even a network stack to interfere. If you are working with
remote data, then the network slows things down..

The S3A client just makes things up. you can configure the settings to lie about block size.
If you have 100MB files and want to split the work five ways, in that job, set

spark.hadoop.fs.s3a.block.size = 20971520

The other object stores have different options, but it's the same thing really. You get to
choose client size what Spark is told, which is then used by the driver to make its decisions
about which splits to give to which drivers for processing, the order, etc.

Unlike HDFS, the bandwidth you get off S3 for a single file is fixed, irrespective of how
many blocks you tell the client there are. Declaring setting a lower block size & so allowing
more workers at the data isn't going to guarantee more performance, you'll just be sharing
the same IO rate

...though, talking to S3, a big factor in performance working with the data is actually cost
of breaking and recreating HTTP connections, which happens a lot if you have seek-heavy code
reading large files. And the columnar formats, ORC and Parquet, are seek heavy, provided they
aren't gzipped. Reading these files has pretty awful performance until you run Hadoop 2.8+
and tell S3A that you are doing random IO (which kills .gz reading, use wisely)

spark.hadoop.fs.s3a.experimental.fadvise random


All this stuff and more is all in the source files —don't be afraid to look into it to see
what's going on. I always recommend starting with the stack traces you get when things aren't
working right. If you are using S3, that's all in : https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a

-steve


Mime
View raw message