spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrés Ivaldi <iaiva...@gmail.com>
Subject Re: Select entire row based on a logic applied on 2 columns across multiple rows
Date Wed, 30 Aug 2017 12:07:43 GMT
Hi, if you need the last value from income in window function you can use
last_value.
No tested but meaby with @ayan sql

spark.sql("select *, row_number(), last_value(income) over (partition by id
order by income_age_ts desc) r from t")


On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2pradeep@gmail.com>
wrote:

> @ayan,
>
> Thanks for your response
>
> I would like to have functions in this case  calculateIncome and the
> reason why I need function is to reuse in other parts of the application
> ..that's the reason I'm planning for mapgroups with function as argument
> which takes rowiterator ..but not sure if this is the best to implement as
> my initial dataframe is very large
>
> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.ayan@gmail.com> wrote:
>
>> Hi
>>
>> the tool you are looking for is window function.  Example:
>>
>> >>> df.show()
>> +--------+----+---+------+-------------+
>> |JoinDate|dept| id|income|income_age_ts|
>> +--------+----+---+------+-------------+
>> | 4/20/13|  ES|101| 19000|      4/20/17|
>> | 4/20/13|  OS|101| 10000|      10/3/15|
>> | 4/20/12|  DS|102| 13000|       5/9/17|
>> | 4/20/12|  CS|102| 12000|       5/8/17|
>> | 4/20/10|  EQ|103| 10000|       5/9/17|
>> | 4/20/10|  MD|103|  9000|       5/8/17|
>> +--------+----+---+------+-------------+
>>
>> >>> res = spark.sql("select *, row_number() over (partition by id order
>> by income_age_ts desc) r from t")
>> >>> res.show()
>> +--------+----+---+------+-------------+---+
>> |JoinDate|dept| id|income|income_age_ts|  r|
>> +--------+----+---+------+-------------+---+
>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>> | 4/20/10|  MD|103|  9000|       5/8/17|  2|
>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>> | 4/20/13|  OS|101| 10000|      10/3/15|  2|
>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>> | 4/20/12|  CS|102| 12000|       5/8/17|  2|
>> +--------+----+---+------+-------------+---+
>>
>> >>> res = spark.sql("select * from (select *, row_number() over
>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>> >>> res.show()
>> +--------+----+---+------+-------------+---+
>> |JoinDate|dept| id|income|income_age_ts|  r|
>> +--------+----+---+------+-------------+---+
>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>> +--------+----+---+------+-------------+---+
>>
>> This should be better because it uses all in-built optimizations in Spark.
>>
>> Best
>> Ayan
>>
>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2pradeep@gmail.com>
>> wrote:
>>
>>> Please click on unnamed text/html  link for better view
>>>
>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2pradeep@gmail.com>
>>> wrote:
>>>
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@
>>>> capitalone.com>
>>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>>> Subject: Spark question
>>>> To: purna pradeep <purna2pradeep@gmail.com>
>>>>
>>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>>
>>>>
>>>>
>>>> EmployeeID
>>>>
>>>> INCOME
>>>>
>>>> INCOME AGE TS
>>>>
>>>> JoinDate
>>>>
>>>> Dept
>>>>
>>>> 101
>>>>
>>>> 19000
>>>>
>>>> 4/20/17
>>>>
>>>> 4/20/13
>>>>
>>>> ES
>>>>
>>>> 101
>>>>
>>>> 10000
>>>>
>>>> 10/3/15
>>>>
>>>> 4/20/13
>>>>
>>>> OS
>>>>
>>>> 102
>>>>
>>>> 13000
>>>>
>>>> 5/9/17
>>>>
>>>> 4/20/12
>>>>
>>>> DS
>>>>
>>>> 102
>>>>
>>>> 12000
>>>>
>>>> 5/8/17
>>>>
>>>> 4/20/12
>>>>
>>>> CS
>>>>
>>>> 103
>>>>
>>>> 10000
>>>>
>>>> 5/9/17
>>>>
>>>> 4/20/10
>>>>
>>>> EQ
>>>>
>>>> 103
>>>>
>>>> 9000
>>>>
>>>> 5/8/15
>>>>
>>>> 4/20/10
>>>>
>>>> MD
>>>>
>>>> Get the latest income of an employee which has  Income_age ts <10 months
>>>>
>>>> Expected output Dataframe
>>>>
>>>> EmployeeID
>>>>
>>>> INCOME
>>>>
>>>> INCOME AGE TS
>>>>
>>>> JoinDate
>>>>
>>>> Dept
>>>>
>>>> 101
>>>>
>>>> 19000
>>>>
>>>> 4/20/17
>>>>
>>>> 4/20/13
>>>>
>>>> ES
>>>>
>>>> 102
>>>>
>>>> 13000
>>>>
>>>> 5/9/17
>>>>
>>>> 4/20/12
>>>>
>>>> DS
>>>>
>>>> 103
>>>>
>>>> 10000
>>>>
>>>> 5/9/17
>>>>
>>>> 4/20/10
>>>>
>>>> EQ
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>> Below is what im planning to implement
>>>>
>>>>
>>>>
>>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>>>> *JOINDATE*: Int,DEPT:String)
>>>>
>>>>
>>>>
>>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>>>> *"Date"*). add(*"DEPT"*,*"String"*)
>>>>
>>>>
>>>>
>>>> *//Reading from the File **import *sparkSession.implicits._
>>>>
>>>> *val *readEmpFile = sparkSession.read
>>>>   .option(*"sep"*, *","*)
>>>>   .schema(empSchema)
>>>>   .csv(INPUT_DIRECTORY)
>>>>
>>>>
>>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>>>
>>>>
>>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>>>> EmployeeID*)
>>>>
>>>>
>>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>>>
>>>>   *val *empList = empData.toList
>>>>
>>>>
>>>> *//calculate income has Logic to figureout latest income for an account
>>>> and returns latest income   val *income = calculateIncome(empList)
>>>>
>>>>
>>>>   *for *(i <- empList) {
>>>>
>>>>       *val *row = i
>>>>
>>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>>>   }
>>>>   *return  "Done"*
>>>>
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> Is this a better approach or even the right approach to implement the
>>>> same.If not please suggest a better way to implement the same?
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


-- 
Ing. Ivaldi Andres

Mime
View raw message