spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Merge two dataframes
Date Wed, 19 May 2021 09:06:53 GMT
Hi  Kushagra,


I believe you are referring to this warning below

WARN window.WindowExec: No Partition Defined for Window operation! Moving
all data to a single partition, this can cause serious performance
degradation.

I don't know an easy way around it. If the operation is only once you may
be able to live it. Let me think about what else can be done.

HTH

Mich


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 18 May 2021 at 20:51, kushagra deep <kushagra94deep@gmail.com>
wrote:

> Thanks  a lot Mich , this works though I have to test for scalability.
> I have one question though . If we dont specify any column in partitionBy
> will it shuffle all the records in one executor ? Because this is what
> seems to be happening.
>
>
> Thanks once again !
> Regards
> Kushagra Deep
>
> On Tue, May 18, 2021 at 10:48 PM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Ok, this should hopefully work as it uses row_number.
>>
>> from pyspark.sql.window import Window
>> import pyspark.sql.functions as F
>> from pyspark.sql.functions import row_number
>>
>> def spark_session(appName):
>>   return SparkSession.builder \
>>         .appName(appName) \
>>         .enableHiveSupport() \
>>         .getOrCreate()
>> appName = "test"
>> spark =spark_session(appName)
>> ##
>> ## Get a DF first from csv files
>> ##
>> d1location="hdfs://rhes75:9000/tmp/df1.csv"
>> d2location="hdfs://rhes75:9000/tmp/df2.csv"
>>
>> df1 = spark.read.csv(d1location, header="true")
>> df1.printSchema()
>> df1.show()
>> df2 = spark.read.csv(d2location, header="true")
>> df2.printSchema()
>> df2.show()
>> df1 =
>> df1.select(F.row_number().over(Window.partitionBy().orderBy(df1['amount_6m'])).alias("row_num"),"amount_6m")
>> df1.show()
>> df2 =
>> df2.select(F.row_number().over(Window.partitionBy().orderBy(df2['amount_9m'])).alias("row_num"),"amount_9m")
>> df2.show()
>> df1.join(df2,"row_num","inner").select("amount_6m","amount_9m").show()
>>
>>
>> root
>>  |-- amount_6m: string (nullable = true)
>>
>> +---------+
>> |amount_6m|
>> +---------+
>> |      100|
>> |      200|
>> |      300|
>> |      400|
>> |      500 |
>> +---------+
>>
>> root
>>  |-- amount_9m: string (nullable = true)
>>
>> +---------+
>> |amount_9m|
>> +---------+
>> |      500|
>> |      600|
>> |      700|
>> |      800|
>> |      900|
>> +---------+
>>
>> +-------+---------+
>> |row_num|amount_6m|
>> +-------+---------+
>> |      1|      100|
>> |      2|      200|
>> |      3|      300|
>> |      4|      400|
>> |      5|      500 |
>> +-------+---------+
>>
>> +-------+---------+
>> |row_num|amount_9m|
>> +-------+---------+
>> |      1|      500|
>> |      2|      600|
>> |      3|      700|
>> |      4|      800|
>> |      5|      900|
>> +-------+---------+
>>
>> +---------+---------+
>> |amount_6m|amount_9m|
>> +---------+---------+
>> |      100|      500|
>> |      200|      600|
>> |      300|      700|
>> |      400|      800|
>> |     500 |      900|
>> +---------+---------+
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 18 May 2021 at 16:39, kushagra deep <kushagra94deep@gmail.com>
>> wrote:
>>
>>> The use case is to calculate PSI/CSI values . And yes the union is one
>>> to one row as you showed.
>>>
>>> On Tue, May 18, 2021, 20:39 Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hi Kushagra,
>>>>
>>>> A bit late on this but what is the business use case for this merge?
>>>>
>>>> You have two data frames each with one column and you want to UNION
>>>> them in a certain way but the correlation is not known. In other words this
>>>> UNION is as is?
>>>>
>>>>        amount_6m | amount_9m
>>>>        100             500
>>>>        200             600
>>>>
>>>> HTH
>>>>
>>>>
>>>> On Wed, 12 May 2021 at 13:51, kushagra deep <kushagra94deep@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have two dataframes
>>>>>
>>>>> df1
>>>>>
>>>>> amount_6m
>>>>>  100
>>>>>  200
>>>>>  300
>>>>>  400
>>>>>  500
>>>>>
>>>>> And a second data df2 below
>>>>>
>>>>>  amount_9m
>>>>>   500
>>>>>   600
>>>>>   700
>>>>>   800
>>>>>   900
>>>>>
>>>>> The number of rows is same in both dataframes.
>>>>>
>>>>> Can I merge the two dataframes to achieve below df
>>>>>
>>>>> df3
>>>>>
>>>>> amount_6m | amount_9m
>>>>>     100                   500
>>>>>      200                  600
>>>>>      300                  700
>>>>>      400                  800
>>>>>      500                  900
>>>>>
>>>>> Thanks in advance
>>>>>
>>>>> Reg,
>>>>> Kushagra Deep
>>>>>
>>>>>

Mime
View raw message