spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick McCarthy <pmccar...@dstillery.com.INVALID>
Subject Re: Save Spark dataframe as dynamic partitioned table in Hive
Date Thu, 16 Apr 2020 12:25:29 GMT
What happens if you change your insert statement to be

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
broadcastValue, brand)

and then add the value for brand into the select as
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
You may need to rearrange the order of the partitions to put dynamic
partitions before static.

On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Hi,
>
> I have an XML file that is read into Spark using Databa bricks jar file
>
> spark-xml_2.11-0.9.0.jar
>
> Doing some tests
>
> This is the format of XML (one row here)
>
> //*
> <sms_request>
> <sms_campaign_code>SKY</sms_campaign_code>
> <target_mobile_no>0123456789</target_mobile_no>
> <ocis_party_id>123456789</ocis_party_id>
> <brand>XYZ</brand>
> <sms_template_code>GLX</sms_template_code>
> <sms_request_external_ref>12345678</sms_request_external_ref>
> <sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
> <sms_template_variable>
> </sms_template_variable>
> </sms_request>
> */
>
> OK I am trying to insert data into a hive partitioned table through spark
> as follows:
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.functions._
> import java.util.Date
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.SaveMode
>
> sc.setLogLevel("WARN")
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> Logger.getLogger("org").setLevel(Level.OFF)
> Logger.getLogger("akka").setLevel(Level.OFF)
>
> // xml stuff
> import com.databricks.spark.xml.functions.from_xml
> import com.databricks.spark.xml.schema_of_xml
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> DoubleType}
> import com.databricks.spark.xml._
> import spark.implicits._
> //
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
>
> val broadcastValue = "123456789"
> val df = spark.read.
>                 format("com.databricks.spark.xml").
>                 option("rootTag", "hierarchy").
>                 option("rowTag", "sms_request").
>                 load("/tmp/broadcast.xml")
> df.printSchema
> df.show(10,false)
>
> df.createOrReplaceTempView("tmp")
> // Need to create and populate target ORC table michtest.BroadcastStaging
> //
> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
>
>   var sqltext = """
>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
>      partyId STRING
>    , phoneNumber STRING
>   )
>   PARTITIONED BY (
>      broadcastId STRING
>    , brand STRING)
>   STORED AS PARQUET
>   """
>   HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table
>   //
>      // Dynamic partitioning is disabled by default. We turn it on
>      spark.sql("SET hive.exec.dynamic.partition = true")
>      spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
>      // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
>
>
>
>
>
>
>
>
> * sqltext = """  INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> (broadcastId = broadcastValue, brand = brand)  SELECT
> ocis_party_id AS partyId        , target_mobile_no AS phoneNumber  FROM
> tmp  """*
>   spark.sql(sqltext)
>   spark.sql("select * from michtest.BroadcastStaging").show(10,false)
>
>
> This does not work because I need to pass onetime fixed value for
> partition column value *broadcastId *and dynamic value for *brand* column
> from the table itself
>
>
> *This is the outcome of run*
>
>
> Started at
> [16/04/2020 00:37:34.34]
> root
>  |-- brand: string (nullable = true)
>  |-- ocis_party_id: long (nullable = true)
>  |-- sms_campaign_code: string (nullable = true)
>  |-- sms_request_external_ref: long (nullable = true)
>  |-- sms_request_external_txn_ref: long (nullable = true)
>  |-- sms_template_code: string (nullable = true)
>  |-- sms_template_variable: string (nullable = true)
>  |-- target_mobile_no: long (nullable = true)
>
>
> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
>
> |brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
>
> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> |XYZ  |123456789    |SKY              |12345678                |55555555
>                  |GLX              |
>                     |123456789       |
>
> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
>
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
>
> == SQL ==
>
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand = dummy)
>
> -------------------------------------------------------------------------------------^^^
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>   FROM tmp
>
> It fails passing partition values
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Mime
View raw message