spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: How to insert data for 100 partitions at a time using Spark SQL
Date Sun, 22 May 2016 20:55:33 GMT
Whatever you do the lion share of time is going to be taken by insert into
Hive table.

Ok check this. It is CSV files inserted into Hive ORC table. This version
uses Hive on Spark engine and it is written in Hive executed via beeline

--1 Move .CSV data into HDFS:
--2 Create an external table.
--3 Create the ORC table.
--4 Insert the data from the external table to the Hive ORC table

select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
StartTime;
set hive.exec.reducers.max=256;
use accounts;
--set hive.execution.engine=mr;
--2)
DROP TABLE IF EXISTS stg_t2;
CREATE EXTERNAL TABLE stg_t2 (
 INVOICENUMBER string
,PAYMENTDATE string
,NET string
,VAT string
,TOTAL string
)
COMMENT 'from csv file from excel sheet nw_10124772'
ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION '/data/stg/accounts/nw/10124772'
TBLPROPERTIES ("skip.header.line.count"="1")
;
--3)
DROP TABLE IF EXISTS t2;
CREATE TABLE t2 (
 INVOICENUMBER          INT
,PAYMENTDATE            date
,NET                    DECIMAL(20,2)
,VAT                    DECIMAL(20,2)
,TOTAL                  DECIMAL(20,2)
)
COMMENT 'from csv file from excel sheet nw_10124772'
CLUSTERED BY (INVOICENUMBER) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
;
--4) Put data in target table. do the conversion and ignore empty rows
INSERT INTO TABLE t2
SELECT
          INVOICENUMBER
        ,
TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/yyyy'),'yyyy-MM-dd'))
AS paymentdate
        --, CAST(REGEXP_REPLACE(SUBSTR(net,2,20),",","") AS DECIMAL(20,2))
        , CAST(REGEXP_REPLACE(net,'[^\\d\\.]','') AS DECIMAL(20,2))
        , CAST(REGEXP_REPLACE(vat,'[^\\d\\.]','') AS DECIMAL(20,2))
        , CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2))
FROM
stg_t2
WHERE
--        INVOICENUMBER > 0 AND
        CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2)) > 0.0
-- Exclude empty rows
;
select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS EndTime;
!exit

And similar using Spark shell and temp table

import org.apache.spark.sql.functions._
import java.sql.{Date, Timestamp}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
//
// Get a DF first based on Databricks CSV libraries ignore column heading
because of column called "Type"
//
val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
//
//  [Date: string,  Type: string,  Description: string,  Value: double,
Balance: double,  Account Name: string,  Account Number: string]
//
case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)
// Map the columns to names
//
val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
//
// Create a Spark temporary table
//
a.toDF.registerTempTable("tmp")
//
// Test it here
//
//sql("select TransactionDate, TransactionType, Description, Value,
Balance, AccountName, AccountNumber from tmp").take(2)
//
// Need to create and populate target ORC table nw_10124772 in database
accounts.in Hive
//
sql("use accounts")
//
// Drop and create table nw_10124772
//
sql("DROP TABLE IF EXISTS accounts.nw_10124772")
var sqltext : String = ""
sqltext = """
CREATE TABLE accounts.nw_10124772 (
TransactionDate            DATE
,TransactionType           String
,Description               String
,Value                     Double
,Balance                   Double
,AccountName               String
,AccountNumber             Int
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.nw_10124772
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd'))
AS TransactionDate
        , TransactionType
        , Description
        , Value
        , Balance
        , AccountName
        , AccountNumber
FROM tmp
"""
sql(sqltext)
//
// Test all went OK by looking at some old transactions
//
sql("Select TransactionDate, Value, Balance from nw_10124772 where
TransactionDate < '2011-05-30'").collect.foreach(println)
//
println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit()


Anyway worth trying

HTH



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 22 May 2016 at 20:49, swetha kasireddy <swethakasireddy@gmail.com> wrote:

> I am doing the 1. currently using the following and it takes a lot of
> time. Whats the advantage of doing 2 and how to do it?
>
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
> stored as ORC LOCATION '/user/users' ")
>       sqlContext.sql("  orc.compress= SNAPPY")
>       sqlContext.sql(
>         """ from recordsTemp ps   insert overwrite table users
> partition(datePartition , idPartition )  select ps.id, ps.record ,
> ps.datePartition, ps.idPartition  """.stripMargin)
>
> On Sun, May 22, 2016 at 12:47 PM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> two alternatives for this ETL or ELT
>>
>>
>>    1. There is only one external ORC table and you do insert overwrite
>>    into that external table through Spark sql
>>    2. or
>>    3. 14k files loaded into staging area/read directory and then insert
>>    overwrite into an ORC table and th
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 May 2016 at 20:38, swetha kasireddy <swethakasireddy@gmail.com>
>> wrote:
>>
>>> Around 14000 partitions need to be loaded every hour. Yes, I tested this
>>> and its taking a lot of time to load. A partition would look something like
>>> the following which is further partitioned by userId with all the
>>> userRecords for that date inside it.
>>>
>>> 5 2016-05-20 16:03 /user/user/userRecords/dtPartitioner=2012-09-12
>>>
>>> On Sun, May 22, 2016 at 12:30 PM, Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> by partition do you mean 14000 files loaded in each batch session (say
>>>> daily)?.
>>>>
>>>> Have you actually tested this?
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 22 May 2016 at 20:24, swetha kasireddy <swethakasireddy@gmail.com>
>>>> wrote:
>>>>
>>>>> The data is not very big. Say 1MB-10 MB at the max per partition. What
>>>>> is the best way to insert this 14k partitions with decent performance?
>>>>>
>>>>> On Sun, May 22, 2016 at 12:18 PM, Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> the acid question is how many rows are you going to insert in a batch
>>>>>> session? btw if this is purely an sql operation then you can do all
that in
>>>>>> hive running on spark engine. It will be very fast as well.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 22 May 2016 at 20:14, Jörn Franke <jornfranke@gmail.com>
wrote:
>>>>>>
>>>>>>> 14000 partitions seem to be way too many to be performant (except
>>>>>>> for large data sets). How much data does one partition contain?
>>>>>>>
>>>>>>> > On 22 May 2016, at 09:34, SRK <swethakasireddy@gmail.com>
wrote:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > In my Spark SQL query to insert data, I have around 14,000
>>>>>>> partitions of
>>>>>>> > data which seems to be causing memory issues. How can I
insert the
>>>>>>> data for
>>>>>>> > 100 partitions at a time to avoid any memory issues?
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>>>>>> > Sent from the Apache Spark User List mailing list archive
at
>>>>>>> Nabble.com.
>>>>>>> >
>>>>>>> >
>>>>>>> ---------------------------------------------------------------------
>>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>>>>>>> >
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message