spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gourav Sengupta <gourav.sengu...@gmail.com>
Subject Re: Loading a large parquet file how much memory do I need
Date Mon, 27 Nov 2017 17:06:00 GMT
Hi,

I think that I have mentioned all the required alternatives. However I am
quite curious as to how did you conclude that processing using EMR is going
to be more expensive than using any other stack. I have been using EMR
since last 6 years (almost about the time it came out), and have always
found it cheap, reliable, safe and stable (ofcourse its like fire, if you
are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
alexander.czech@googlemail.com> wrote:

> I don't use EMR I spin my clusters up using flintrock (beeing a student my
> budget is slim), my code is writen in pyspark and my data is in the
> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>
> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>
> +-----------+--------------------------+-------+------+-------+
> |        uri|                 link_list|lang_id|vector|content|
> +-----------+--------------------------+-------+------+-------+
> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>
>
> *(link_list is a  ArrayType(StringType()))*
>
> Step1 : I only load the uri and link_list columns (but they make up the
> bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id',
> func.monotonically_increasing_id())
> resulting in a dataframe looking like this
>
> *DF_A*:
>
> +-----------+--------------------------+-------+
> |        uri|                 link_list| uri_id|
> +-----------+--------------------------+-------+
> |www.123.com|[www.123.com,www.abc.com,]|      1|
> |www.abc.com|[www.opq.com,www.456.com,]|      2|
> |www.456.com|[www.xyz.com,www.abc.com,]|      3|
>
> Step 2: I create another dataframe containing only the uri and uri_id which is renamed
to link_id fields
>
> *DF_B*:
> +-----------+--------+
> |        uri| link_id|
> +-----------+--------+
> |www.123.com|       1|
> |www.abc.com|       2|
> |www.456.com|       3|
>
> Step 3: Now I exploded the link_list field in *DF_A* with  *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
> This gives me
>
> *DF_C*:
> +-----------+-------+
> |       link| uri_id|
> +-----------+-------+
> |www.123.com|      1|
> |www.abc.com|      1|
> |www.opq.com|      2|
> |www.456.com|      2|
> |www.xyz.com|      3|
> |www.abc.com|      3|
>
>
> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, "left_outer").drop("uri")
Which results in the final dataframe:
>
>
> +-----------+-------+--------+
> |       link| uri_id| link_id|
> +-----------+-------+--------+
> |www.123.com|      1|       1|
> |www.abc.com|      1|       2|
> |www.opq.com|      2|    null|
> |www.456.com|      2|       3|
> |www.xyz.com|      3|    null|
> |www.abc.com|      3|       1|
>
> (in code the field link is also dropped but this makes it hopefully more intelligible
this way)
>
>
> the rest is to just join the uri_id with the lang_id,vector,content rows that are not
null which is trivial.
>
> I hope this makes it more readable. If there is an aws service that makes it easier for
me to deal with the data, since it is basically "just" database operations I'm also happy
to hear about it.
> I got a few days on my hands until the preprocessing is done but I'm not sure if the
explod in step 3 can be done in another aws service.
>
> thanks!
>
>
> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi,
>>
>> it would be much simpler in case you just provide two tables with the
>> samples of input and output. Going through the verbose text and trying to
>> read and figure out what is happening is a bit daunting.
>>
>> Personally, given that you have your entire data in Parquet, I do not
>> think that you will need to have a large cluster size at all. You can do it
>> with a small size cluster as well, but depending on the cluster size, you
>> might want to create intermediate staging tables or persist the data.
>>
>> Also it will be of help if you could kindly provide the EMR version that
>> you are using.
>>
>>
>> On another note also mention the AWS Region you are in. If Redshift
>> Spectrum is available, or you can use Athena, or you can use Presto, then
>> running massive aggregates over huge data sets at fraction of cost and at
>> least 10x speed may be handy as well.
>>
>> Let me know in case you need any further help.
>>
>> Regards,
>> Gourav
>>
>> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
>> alexander.czech@googlemail.com> wrote:
>>
>>> I have a temporary result file ( the 10TB one) that looks like this
>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>> The bulk of data is in url_list and at the moment I can only guess how
>>> large url_list is. I want to give an ID to every url and then this ID to
>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>> and text only have values for 1% of all rows so they only play a very minor
>>> roll.
>>>
>>> The idea at the moment is to load the URL and URL_list column from the
>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>> URL_list column. For the rest of the computation I only load those rows
>>> from the parquet that have values in (language,vector and text) and join
>>> them with the ID table.
>>>
>>> In the end I will create 3 tables:
>>> 1. url, ID
>>> 2. ID, ID
>>> 3. ID,language,vector,text
>>>
>>> Basically there is one very big shuffle going on the rest is not that
>>> heavy. The CPU intense lifting was done before that.
>>>
>>> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
>>> alexander.czech@googlemail.com> wrote:
>>>
>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>> and text only have values for 1% of all rows so they only play a very minor
>>>> roll.
>>>>
>>>> The idea at the moment is to load the URL and URL_list column from the
>>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>>> URL_list column. For the rest of the computation I only load those rows
>>>> from the parquet that have values in (language,vector and text) and join
>>>> them with the ID table.
>>>>
>>>> In the end I will create 3 tables:
>>>> 1. url, ID
>>>> 2. ID, ID
>>>> 3. ID,language,vector,text
>>>>
>>>> Basically there is one very big shuffle going on the rest is not that
>>>> heavy. The CPU intense lifting was done before that.
>>>>
>>>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <
>>>> georg.kf.heiler@gmail.com> wrote:
>>>>
>>>>> How many columns do you need from the big file?
>>>>>
>>>>> Also how CPU / memory intensive are the computations you want to
>>>>> perform?
>>>>>
>>>>> Alexander Czech <alexander.czech@googlemail.com> schrieb am Mo.
27.
>>>>> Nov. 2017 um 10:57:
>>>>>
>>>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>>>> what EC2 instances to use.
>>>>>>
>>>>>> Should I go for instances that in total have a larger memory size
>>>>>> than 10TB? Or is it enough that they have in total enough SSD storage
so
>>>>>> that everything can be spilled to disk?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message