spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Patel <arunp.bigd...@gmail.com>
Subject Re: Best approach for processing all files parallelly
Date Mon, 10 Oct 2016 10:06:58 GMT
Ayan, which version of Python are you using? I am using 2.6.9 and I don't
find generateFileType and getSchemaFor functions.  Thanks for your help.

On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <guha.ayan@gmail.com> wrote:

> Hi
>
> generateFileType (filename) returns FileType
>
> getSchemaFor(FileType) returns schema for FileType
>
> This for loop DOES NOT process files sequentially. It creates dataframes
> on all files which are of same types sequentially.
>
> On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <arunp.bigdata@gmail.com>
> wrote:
>
>> Thanks Ayan.  Couple of questions:
>>
>> 1) How does generateFileType and getSchemaFor functions look like?
>> 2) 'For loop' is processing files sequentially, right? my requirement is
>> to process all files at same time.
>>
>> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <guha.ayan@gmail.com> wrote:
>>
>>> Hi
>>>
>>> In this case, if you see, t[1] is NOT the file content, as I have added
>>> a "FileType" field. So, this collect is just bringing in the list of file
>>> types, should be fine
>>>
>>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigdata@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ayan.  I am really concerned about the collect.
>>>>
>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>
>>>> This will ship all files on to the driver, right?  It must be
>>>> inefficient.
>>>>
>>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.ayan@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I think you are correct direction. What is missing is: You do not need
>>>>> to create DF for each file. You can scramble files with similar structures
>>>>> together (by doing some filter on file name) and then create a DF for
each
>>>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me.
I
>>>>> would probably do it like this
>>>>>
>>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>>>> (t[0],generateFileType(t[0]),t[1])
>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>
>>>>> DFList = []
>>>>>
>>>>> for k in types:
>>>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>>>      DFList.append(df)
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigdata@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My Pyspark program is currently identifies the list of the files
from
>>>>>> a directory (Using Python Popen command taking hadoop fs -ls arguments).
>>>>>> For each file, a Dataframe is created and processed. This is sequeatial.
>>>>>> How to process all files paralelly?  Please note that every file
in the
>>>>>> directory has different schema.  So, depending on the file name,
different
>>>>>> logic is used for each file. So, I cannot really create one Dataframe
for
>>>>>> all these files and iterate each row.  Using wholeTextFiles seems
to be
>>>>>> good approach for me.  But, I am not sure how to create DataFrame
from
>>>>>> this.  For example, Is there a way to do this way do something like
below.
>>>>>>
>>>>>> def createDFProcess(myrdd):
>>>>>>     df = sqlCtx.read.json(myrdd)
>>>>>>     df.show()
>>>>>>
>>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname',
>>>>>> 'fcontent'])
>>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>>>
>>>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>>>> object is not callable'.  How to make it work?
>>>>>>
>>>>>> Or is there a better approach?
>>>>>>
>>>>>> -Arun
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message