spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Reading parquet files in parallel on the cluster
Date Tue, 25 May 2021 22:07:56 GMT
Thanks for your time & advice. We will experiment & see which works best
for us.... EMR or ECS.

On Tue, May 25, 2021 at 2:39 PM Sean Owen <srowen@gmail.com> wrote:

> No, the work is happening on the cluster; you just have (say) 100 parallel
> jobs running at the same time. You apply spark.read.parquet to each dir --
> from the driver yes, but spark.read is distributed. At extremes, yes that
> would challenge the driver, to manage 1000s of jobs concurrently. You may
> also find that if each job is tiny, there's some overhead in running each
> as a distributed operation that may be significant. But it seems like the
> simplest thing and will probably work fine.
>
> On Tue, May 25, 2021 at 4:34 PM Eric Beabes <mailinglists19@gmail.com>
> wrote:
>
>> Right... but the problem is still the same, no? Those N Jobs (aka Futures
>> or Threads) will all be running on the Driver. Each with its own
>> SparkSession. Isn't that going to put a lot of burden on one Machine? Is
>> that really distributing the load across the cluster? Am I missing
>> something?
>>
>> Would it be better to use ECS (Elastic Container Service) for this use
>> case which allows us to autoscale?
>>
>> On Tue, May 25, 2021 at 2:16 PM Sean Owen <srowen@gmail.com> wrote:
>>
>>> What you could do is launch N Spark jobs in parallel from the driver.
>>> Each one would process a directory you supply with spark.read.parquet, for
>>> example. You would just have 10s or 100s of those jobs running at the same
>>> time.  You have to write a bit of async code to do it, but it's pretty easy
>>> with Scala Futures.
>>>
>>> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglists19@gmail.com>
>>> wrote:
>>>
>>>> Here's the use case:
>>>>
>>>> We've a bunch of directories (over 1000) which contain tons of small
>>>> files in each. Each directory is for a different customer so they are
>>>> independent in that respect. We need to merge all the small files in each
>>>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>>>> function.
>>>>
>>>> Clearly we can do this on the Driver by doing something like:
>>>>
>>>> list.par.foreach (dir =>compact(spark, dir))
>>>>
>>>> This works but the problem here is that the parallelism happens on
>>>> Driver which won't scale when we've 10,000 customers! At any given time
>>>> there will be only as many compactions happening as the number of cores on
>>>> the Driver, right?
>>>>
>>>> We were hoping to do this:
>>>>
>>>> val df = list.toDF()
>>>> df.foreach(dir => compact(spark,dir))
>>>>
>>>> Our hope was, this will distribute the load amongst Spark Executors &
>>>> will scale better.  But this throws the NullPointerException shown in the
>>>> original email.
>>>>
>>>> Is there a better way to do this?
>>>>
>>>>
>>>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>>>> silvio.fiorito@granturing.com> wrote:
>>>>
>>>>> Why not just read from Spark as normal? Do these files have different
>>>>> or incompatible schemas?
>>>>>
>>>>>
>>>>>
>>>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>>>
>>>>>
>>>>>
>>>>> *From: *Eric Beabes <mailinglists19@gmail.com>
>>>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>>>> *To: *spark-user <user@spark.apache.org>
>>>>> *Subject: *Reading parquet files in parallel on the cluster
>>>>>
>>>>>
>>>>>
>>>>> I've a use case in which I need to read Parquet files in parallel from
>>>>> over 1000+ directories. I am doing something like this:
>>>>>
>>>>>
>>>>>
>>>>>    val df = list.toList.toDF()
>>>>>
>>>>>     df.foreach(c => {
>>>>>       val config = *getConfigs()*
>>>>> *      doSomething*(spark, config)
>>>>>     })
>>>>>
>>>>>
>>>>>
>>>>> In the doSomething method, when I try to do this:
>>>>>
>>>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>>>
>>>>>
>>>>>
>>>>> I get a NullPointer exception given below. It seems the 'spark.read'
only works on the Driver not on the cluster. How can I do what I want to do? Please let me
know. Thank you.
>>>>>
>>>>>
>>>>>
>>>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>>>> java.lang.NullPointerException
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>>>>
>>>>>
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>>>
>>>>>
>>>>>
>>>>>

Mime
View raw message