Hi

In this case, if you see, t[1] is NOT the file content, as I have added a "FileType" field. So, this collect is just bringing in the list of file types, should be fine 

On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigdata@gmail.com> wrote:
Thanks Ayan.  I am really concerned about the collect.  

types = rdd1.map(lambda t: t[1]).distinct().collect()

This will ship all files on to the driver, right?  It must be inefficient.   

On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.ayan@gmail.com> wrote:
Hi

I think you are correct direction. What is missing is: You do not need to create DF for each file. You can scramble files with similar structures together (by doing some filter on file name) and then create a DF for each type of file. Also, creating DF on wholeTextFile seems wasteful to me. I would probably do it like this

rdd1 = sc.wholeTextFile(inputpath).map(lambda t: (t[0],generateFileType(t[0]),t[1])
types = rdd1.map(lambda t: t[1]).distinct().collect()

DFList = []

for k in types:
     df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
     DFList.append(df)



On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigdata@gmail.com> wrote:
My Pyspark program is currently identifies the list of the files from a directory (Using Python Popen command taking hadoop fs -ls arguments).  For each file, a Dataframe is created and processed. This is sequeatial. How to process all files paralelly?  Please note that every file in the directory has different schema.  So, depending on the file name, different logic is used for each file. So, I cannot really create one Dataframe for all these files and iterate each row.  Using wholeTextFiles seems to be good approach for me.  But, I am not sure how to create DataFrame from this.  For example, Is there a way to do this way do something like below.

def createDFProcess(myrdd):
    df = sqlCtx.read.json(myrdd)
    df.show()
    
whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)

Above code does not work.  I get an error 'TypeError: 'JavaPackage' object is not callable'.  How to make it work?

Or is there a better approach?

-Arun




--
Best Regards,
Ayan Guha




--
Best Regards,
Ayan Guha