For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you can run them in parallel

On Wed, 22 Jan 2020 at 3:50 am, anbutech <> wrote:
Hi sir,

Could you please help me on the below two cases in the databricks pyspark
data processing terabytes of json data read from aws s3 bucket.

case 1:

currently I'm reading multiple tables sequentially to get the day count
from each table

for ex: table_list.csv having one column with multiple table names


tablesDF ="csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]

 // Snowflake Settings and snowflake  table count()

    sfOptions = dict(
      "sfURL" -> "",
      "sfAccount" -> "",
      "sfUser" -> "",
      "sfPassword" -> "",
      "sfDatabase" -> "",
      "sfSchema" -> "",
      "sfWarehouse" -> "",

    // Read data as dataframe

    sfxdf =
      .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)

//databricks delta lake

         dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month)

    resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))

        finalDF = resultDF.withColumn("table_name",



        1) Instead of sequence based running the count query taking one by one
tables ,how to parallel read all the tables from the csv file from s3 and
        distributed the jobs across the cluster.

        2) Could you please how to optimize the above code in the pyspark for
parallel processing all the count query at the same time.

Case 2 :

Multiprocessing case:

        Could you please help me how to achieve multiprocessing on the above
pyspark query to parallel running in the distributed environment.

        By using below snippets is there any way to achieve the parallel processing
pyspark code in the cluster.

        # Creating a pool of 20 processes. You can set this as per your intended
parallelism and your available resources.

   start = time.time()
    pool = multiprocessing.Pool(20)
    # This will execute get_counts() parallel, on each element inside
    # result (a list of dictionary) is constructed when all executions are
    //result =, input_paths)

    end = time.time()

    result_df = pd.DataFrame(result)
    # You can use, result_df.to_csv() to store the results in a csv.
    print('Time take : {}'.format(end - start))

Sent from:

To unsubscribe e-mail:

Best Regards,
Ayan Guha