spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Patel <arunp.bigd...@gmail.com>
Subject Re: Best approach for processing all files parallelly
Date Thu, 06 Oct 2016 12:47:45 GMT
Thanks Ayan.  I am really concerned about the collect.

types = rdd1.map(lambda t: t[1]).distinct().collect()

This will ship all files on to the driver, right?  It must be inefficient.


On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.ayan@gmail.com> wrote:

> Hi
>
> I think you are correct direction. What is missing is: You do not need to
> create DF for each file. You can scramble files with similar structures
> together (by doing some filter on file name) and then create a DF for each
> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
> would probably do it like this
>
> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
> (t[0],generateFileType(t[0]),t[1])
> types = rdd1.map(lambda t: t[1]).distinct().collect()
>
> DFList = []
>
> for k in types:
>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>      DFList.append(df)
>
>
>
> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigdata@gmail.com>
> wrote:
>
>> My Pyspark program is currently identifies the list of the files from a
>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>> each file, a Dataframe is created and processed. This is sequeatial. How to
>> process all files paralelly?  Please note that every file in the directory
>> has different schema.  So, depending on the file name, different logic is
>> used for each file. So, I cannot really create one Dataframe for all these
>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>> for me.  But, I am not sure how to create DataFrame from this.  For
>> example, Is there a way to do this way do something like below.
>>
>> def createDFProcess(myrdd):
>>     df = sqlCtx.read.json(myrdd)
>>     df.show()
>>
>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>
>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>> object is not callable'.  How to make it work?
>>
>> Or is there a better approach?
>>
>> -Arun
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message