spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <guha.a...@gmail.com>
Subject Re: How to filter based on a constant value
Date Sun, 31 Jul 2016 12:37:35 GMT
Hi

here is a quick setup (Based on airlines.txt dataset):

--------------------------------------------------------------------------------------------------------------------------------------------------------------
*from datetime import datetime, timedelta*
*from pyspark.sql.types import **
*from pyspark.sql.functions import udf, col,rank,min*
*from pyspark import SparkContext, HiveContext*
*import sys*
*from pyspark.sql import Window*


*sc = SparkContext()*
*hc = HiveContext(sc)*
*customSchema = StructType([ \*
*StructField("airport_id", IntegerType(), True) , \*
*StructField("name", StringType(), True) , \*
*StructField("city", StringType(), True) , \*
*StructField("country", StringType(), True) , \*
*StructField("iata", StringType(), True) , \*
*StructField("icao", StringType(), True) , \*
*StructField("latitude", DecimalType(precision=20,scale=10), True) , \*
*StructField("longitude",DecimalType(precision=20,scale=10), True) , \*
*StructField("altitude", IntegerType(), True) , \*
*StructField("timezone", DoubleType(), True) , \*
*StructField("dst", StringType(), True) , \*
*StructField("tz_name", StringType(), True)*
*])*

*inFile = sys.argv[1]*

*df1 = df =
hc.read.format('com.databricks.spark.csv').options(header='false',
inferschema='true').load(inFile,schema=customSchema)*


*df1.registerTempTable("airlines")*
*df2 = hc.sql("select airport_id,altitude,r from (select *,rank() over
(order by altitude desc) r from airlines where altitude>100) rs where r=1")*
*print df2.take(10)*

*w = Window.orderBy(df['altitude'].desc())*

*df3 = df1.filter(df1.altitude >
100).select(df1.airport_id,df1.altitude,rank().over(w).alias("r")).filter("r=1")*
*print df3.take(10)*

*sc.stop()*
--------------------------------------------------------------------------------------------------------------------------------------------------------------

Here
<https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html>is
an awesome blog from Databricks.

HTH....

Ayan



On Sun, Jul 31, 2016 at 8:58 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> It is true that whatever an analytic function does can be done by standard
> SQL, with join and sub-queries. But the same routine done by analytic
> function is always faster, or at least as fast, when compared to standard
> SQL.
>
> I will try to see if I can do analytic functions with Spark FP on Data
> Frames. It is essentially replacing the base table with DF and using JAVA
> functions instead of SQL ones on top
>
> Also some text based search functions say LIKE in SQL can be replaced with
> CONTAINS in FP.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 10:56, ayan guha <guha.ayan@gmail.com> wrote:
>
>> The point is, window functions are supposed designed to be faster by
>> doing the calculations in one pass, instead of 2 pass in case of max.
>>
>> DF supports window functions (using sql.Window) so instead of writing
>> sql, you can use it as well.
>>
>> Best
>> Ayan
>>
>> On Sun, Jul 31, 2016 at 7:48 PM, Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> yes reserved word issue thanks
>>>
>>> hive> select *
>>>     > from (select transactiondate, transactiondescription, debitamount
>>>     > , rank() over (order by transactiondate desc) r
>>>     > from accounts.ll_18740868 where transactiondescription like
>>> '%HARRODS%'
>>>     >  ) RS
>>>     > where r=1
>>>     > ;
>>> Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c
>>> Total jobs = 1
>>> Launching Job 1 out of 1
>>> In order to change the average load for a reducer (in bytes):
>>>   set hive.exec.reducers.bytes.per.reducer=<number>
>>> In order to limit the maximum number of reducers:
>>>   set hive.exec.reducers.max=<number>
>>> In order to set a constant number of reducers:
>>>   set mapreduce.job.reduces=<number>
>>> Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99
>>> Query Hive on Spark job[0] stages:
>>> 0
>>> 1
>>> Status: Running (Hive on Spark job[0])
>>> Job Progress Format
>>> CurrentTime StageId_StageAttemptId:
>>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>>> [StageCost]
>>> 2016-07-31 10:48:28,726 Stage-0_0: 0/1  Stage-1_0: 0/1
>>> 2016-07-31 10:48:31,750 Stage-0_0: 0/1  Stage-1_0: 0/1
>>> 2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1      Stage-1_0: 0/1
>>> 2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1
>>> 2016-07-31 10:48:35,780 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
>>> Status: Finished successfully in 10.10 seconds
>>> OK
>>> 2015-12-15      HARRODS LTD CD 4636     10.95   1
>>> Time taken: 46.546 seconds, Fetched: 1 row(s)
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 31 July 2016 at 10:36, ayan guha <guha.ayan@gmail.com> wrote:
>>>
>>>> I think the word "INNER" is reserved in Hive. Please change the alias
>>>> to something else.
>>>>
>>>> Not sure about scala, but essentially it is string replacement.
>>>>
>>>> On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> thanks how about scala?
>>>>>
>>>>> BTW the same analytic code fails in Hive itself:(
>>>>>
>>>>> hive> select *
>>>>>     > from (select transactiondate, transactiondescription, debitamount
>>>>>     > from (select transactiondate, transactiondescription, debitamount
>>>>>     > , rank() over (order by transactiondate desc) r
>>>>>     > from ll_18740868 where transactiondescription like '%XYZ%'
>>>>>     >      ) inner
>>>>>     > where r=1
>>>>>     > ;
>>>>>
>>>>> FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1653)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1137)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:204)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
>>>>>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:446)
>>>>>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:319)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1255)
>>>>>         at
>>>>> org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1301)
>>>>>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1184)
>>>>>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1172)
>>>>>         at
>>>>> org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
>>>>>         at
>>>>> org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
>>>>>         at
>>>>> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:400)
>>>>>         at
>>>>> org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:778)
>>>>>         at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:717)
>>>>>         at
>>>>> org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:645)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>         at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
>>>>>         at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
>>>>> *FAILED: ParseException line 6:7 Failed to recognize predicate
>>>>> 'inner'. Failed rule: 'identifier' in subquery source*
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> On 31 July 2016 at 10:21, ayan guha <guha.ayan@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> This is because Spark does  not provide a way to "bind" variables
>>>>>> like Oracle does.
>>>>>>
>>>>>> So you can build the sql string, like below (in python)
>>>>>>
>>>>>> val = 'XYZ'
>>>>>> sqlbase = "select ..... where col = '<val>'".replace('<val>,val)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 31, 2016 at 6:25 PM, Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Ayan.
>>>>>>>
>>>>>>> This is the one I used
>>>>>>>
>>>>>>> scala> sqltext = """
>>>>>>>      |  select *
>>>>>>>      | from (select transactiondate, transactiondescription,
>>>>>>> debitamount
>>>>>>>      | , rank() over (order by transactiondate desc) r
>>>>>>>      | from ll_18740868 where transactiondescription like '%XYZ%'
>>>>>>>      |       ) inner
>>>>>>>      |  where r=1
>>>>>>>      |    """
>>>>>>>
>>>>>>> scala> HiveContext.sql(sqltext).show
>>>>>>> +---------------+----------------------+-----------+---+
>>>>>>> |transactiondate|transactiondescription|debitamount|  r|
>>>>>>> +---------------+----------------------+-----------+---+
>>>>>>> |     2015-12-15|  XYZ LTD CD 4636 |      10.95|  1|
>>>>>>> +---------------+----------------------+-----------+---+
>>>>>>>
>>>>>>> The issue I see is that in SQL here I cannot pass HASHTAG as
a
>>>>>>> variable to SQL. For example in RDBMS I can do this
>>>>>>>
>>>>>>> 1> declare @pattern varchar(50)
>>>>>>> 2> set @pattern = 'Direct'
>>>>>>> 3> select CHANNEL_DESC from CHANNELS where CHANNEL_DESC like
>>>>>>> '%'||@pattern||'%'
>>>>>>> 4> go
>>>>>>> (1 row affected)
>>>>>>>  CHANNEL_DESC
>>>>>>>  --------------------
>>>>>>>  Direct Sales
>>>>>>>
>>>>>>> but not in Hive or Spark SQL
>>>>>>>
>>>>>>> whereas with FP it does it implicitly.
>>>>>>>
>>>>>>> col("CHANNELS").contains(HASHTAG))
>>>>>>>
>>>>>>> Unless there is a way of doing it?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 31 July 2016 at 01:20, ayan guha <guha.ayan@gmail.com>
wrote:
>>>>>>>
>>>>>>>> select *
>>>>>>>> from (select *,
>>>>>>>>              rank() over (order by transactiondate) r
>>>>>>>>        from ll_18740868 where transactiondescription='XYZ'
>>>>>>>>       ) inner
>>>>>>>> where r=1
>>>>>>>>
>>>>>>>> Hi Mitch,
>>>>>>>>
>>>>>>>> If using SQL is fine, you can try the code above. You need
to
>>>>>>>> register ll_18740868  as temp table.
>>>>>>>>
>>>>>>>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I would like to find out when it was the last time I
paid a
>>>>>>>>> company with Debit Card
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is the way I do it.
>>>>>>>>>
>>>>>>>>> 1) Find the date when I paid last
>>>>>>>>> 2) Find the rest of details from the row(s)
>>>>>>>>>
>>>>>>>>> So
>>>>>>>>>
>>>>>>>>> var HASHTAG = "XYZ"
>>>>>>>>> scala> var maxdate =
>>>>>>>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
>>>>>>>>> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>>>>>>>>
>>>>>>>>> OK so it was 2015-12-15
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Now I want to get the rest of the columns. This one works
when I
>>>>>>>>> hard code the maxdate!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
>>>>>>>>> && col("transactiondate") === "2015-12-15").select("transactiondate",
>>>>>>>>> "transactiondescription", "debitamount").show
>>>>>>>>> +---------------+----------------------+-----------+
>>>>>>>>> |transactiondate|transactiondescription|debitamount|
>>>>>>>>> +---------------+----------------------+-----------+
>>>>>>>>> |     2015-12-15|  XYZ LTD CD 4636 |      10.95|
>>>>>>>>> +---------------+----------------------+-----------+
>>>>>>>>>
>>>>>>>>> Now if I want to use the var maxdate in place of "2015-12-15",
how
>>>>>>>>> would I do that?
>>>>>>>>>
>>>>>>>>> I tried lit(maxdate) etc but they are all giving me error?
>>>>>>>>>
>>>>>>>>> java.lang.RuntimeException: Unsupported literal type
class
>>>>>>>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
>>>>>>>>> [2015-12-15]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Ayan Guha
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Mime
View raw message