spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: This works to filter transactions older than certain months
Date Mon, 28 Mar 2016 22:00:35 GMT
Forgot to mention

Spark 1.6.1
Hive 2.0

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 21:54, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Snippet.
>
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/xxxxxxx")
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> //
> // Need to create and populate target ORC table nw_10124772 in database
> accounts in Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDate            DATE
> ,TransactionType           String
> ,Description               String
> ,Value                     Double
> ,Balance                   Double
> ,AccountName               String
> ,AccountNumber             Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd'))
> AS TransactionDate
>         , TransactionType
>         , Description
>         , Value
>         , Balance
>         , AccountName
>         , AccountNumber
> FROM tmp
> """
> sql(sqltext)
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 20:50, Timur Shenkao <tsh@timshenkao.su> wrote:
>
>> bq. CSV data is stored in an underlying table in Hive (actually created
>> and populated as an ORC table by Spark)
>>
>> How is it possible?
>>
>> On Mon, Mar 28, 2016 at 1:50 AM, Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> A while back I was looking for functional programming to filter out
>>> transactions older > n months etc.
>>>
>>> This turned out to be pretty easy.
>>>
>>> I get today's day as follows
>>>
>>> var today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>>> 'yyyy-MM-dd') ").collect.apply(0).getString(0)
>>>
>>>
>>> CSV data is stored in an underlying table in Hive (actually created and
>>> populated as an ORC table by Spark)
>>>
>>> HiveContext.sql("use accounts")
>>> var n = HiveContext.table("nw_10124772")
>>>
>>> scala> n.printSchema
>>> root
>>>  |-- transactiondate: date (nullable = true)
>>>  |-- transactiontype: string (nullable = true)
>>>  |-- description: string (nullable = true)
>>>  |-- value: double (nullable = true)
>>>  |-- balance: double (nullable = true)
>>>  |-- accountname: string (nullable = true)
>>>  |-- accountnumber: integer (nullable = true)
>>>
>>> //
>>> // Check for historical transactions > 60 months old
>>> //
>>> var old: Int = 60
>>>
>>> val rs = n.filter(add_months(col("transactiondate"),old) <
>>> lit(today)).select(lit(today),
>>> col("transactiondate"),add_months(col("transactiondate"),old)).collect.foreach(println)
>>>
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-23,2016-03-23]
>>> [2016-03-27,2011-03-23,2016-03-23]
>>>
>>>
>>> Which seems to work. Any other suggestions will be appreciated.
>>>
>>> Thanks
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>

Mime
View raw message