spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Does Spark CSV accept a CSV String
Date Thu, 31 Mar 2016 13:34:42 GMT
well my guess is just pkunzip it and use bzip2 to zip it or leave it as it
is.

Databricks handles *.bz2 type files. I know that.

Anyway that is the easy part :)

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 31 March 2016 at 01:02, Benjamin Kim <bbuild11@gmail.com> wrote:

> Hi Mich,
>
> I forgot to mention that - this is the ugly part - the source data
> provider gives us (Windows) pkzip compressed files. Will spark uncompress
> these automatically? I haven’t been able to make it work.
>
> Thanks,
> Ben
>
> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
> Hi Ben,
>
> Well I have done it for standard csv files downloaded from spreadsheets to
> staging directory on hdfs and loaded from there.
>
> First you may not need to unzip them. dartabricks can read them (in my
> case) and zipped files.
>
> Check this. Mine is slightly different from what you have, First I zip my
> csv files with bzip2 and load them into hdfs
>
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""=======  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""=======  Started deleting old files from hdfs staging
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""=======  Started Putting bz2 fileS to hdfs staging
> directory ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""=======  Checking that all files are moved to hdfs staging
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
>
> Now you have all your csv files in the staging directory
>
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("
> hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
>
> // Need to create and populate target ORC table nw_10124772 in database
> accounts.in Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDate            DATE
> ,TransactionType           String
> ,Description               String
> ,Value                     Double
> ,Balance                   Double
> ,AccountName               String
> ,AccountNumber             Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd'))
> AS TransactionDate
>         , TransactionType
>         , Description
>         , Value
>         , Balance
>         , AccountName
>         , AccountNumber
> FROM tmp
> """
> sql(sqltext)
>
> println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.fore
>
> Once you store into a some form of table (Parquet, ORC) etc you can do
> whatever you like with it.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 March 2016 at 22:13, Benjamin Kim <bbuild11@gmail.com> wrote:
>
>> Hi Mich,
>>
>> You are correct. I am talking about the Databricks package spark-csv you
>> have below.
>>
>> The files are stored in s3 and I download, unzip, and store each one of
>> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
>>
>> Here is some of the code.
>>
>> val filesRdd = sc.parallelize(lFiles, 250)
>> filesRdd.foreachPartition(files => {
>>   val s3Client = new AmazonS3Client(new
>> EnvironmentVariableCredentialsProvider())
>>   files.foreach(file => {
>>     val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket,
>> file))
>>     val zipFile = new ZipInputStream(s3Object.getObjectContent())
>>     val csvFile = readZipStream(zipFile)
>>   })
>> })
>>
>> This function does the unzipping and converts to string.
>>
>> def readZipStream(stream: ZipInputStream): String = {
>>   stream.getNextEntry
>>   var stuff = new ListBuffer[String]()
>>   val scanner = new Scanner(stream)
>>   while(scanner.hasNextLine){
>>     stuff += scanner.nextLine
>>   }
>>   stuff.toList.mkString("\n")
>> }
>>
>> The next step is to parse the CSV string and convert to a dataframe,
>> which will populate a Hive/HBase table.
>>
>> If you can help, I would be truly grateful.
>>
>> Thanks,
>> Ben
>>
>>
>> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>> just to clarify are you talking about databricks csv package.
>>
>> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
>>
>> Where are these zipped files? Are they copied to a staging directory in
>> hdfs?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 30 March 2016 at 15:17, Benjamin Kim <bbuild11@gmail.com> wrote:
>>
>>> I have a quick question. I have downloaded multiple zipped files from S3
>>> and unzipped each one of them into strings. The next step is to parse using
>>> a CSV parser. I want to know if there is a way to easily use the spark csv
>>> package for this?
>>>
>>> Thanks,
>>> Ben
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>>
>
>

Mime
View raw message