spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kappaganthu, Sivaram (ES)" <Sivaram.Kappagan...@ADP.com>
Subject Help needed in parsing JSon with nested structures
Date Mon, 31 Oct 2016 10:49:40 GMT
Hello All,



I am processing a nested complex Json and below is the schema for it.
root
|-- businessEntity: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- payGroup: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- reportingPeriod: struct (nullable = true)
|    |    |    |    |    |-- worker: array (nullable = true)
|    |    |    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |    |    |-- category: string (nullable = true)
|    |    |    |    |    |    |    |-- person: struct (nullable = true)
|    |    |    |    |    |    |    |-- tax: array (nullable = true)
|    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |    |    |    |    |-- code: string (nullable = true)
|    |    |    |    |    |    |    |    |    |-- qtdAmount: double (nullable = true)
|    |    |    |    |    |    |    |    |    |-- ytdAmount: double (nullable =
My requirement is to create a hashmap with code concatenated with qtdAmount as key and value
of qtdAmount as value. Map.put(code + "qtdAmount" , qtdAmount). How can i do this with spark.
I tried with below shell commands.
import org.apache.spark.sql._
val sqlcontext = new SQLContext(sc)
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json")
val spark = SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate()
cdm.createOrReplaceTempView("CDM")
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show()
val address = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address
from CDM as address")
val worker = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker from CDM")
val tax = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from
CDM")
val tax = sqlcontext.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax
from CDM")
val codes = tax.select(expode(tax("code"))
scala> val codes = tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount")))


i am trying to get all the codes and qtdAmount into a map. But i am not getting it. Using
multiple explode statements for a single DF, is producing Cartesian product of the elements.
Could someone please help on how to parse the json of this much complex in spark.


Thanks,
Sivaram

----------------------------------------------------------------------
This message and any attachments are intended only for the use of the addressee and may contain
information that is privileged and confidential. If the reader of the message is not the intended
recipient or an authorized representative of the intended recipient, you are hereby notified
that any dissemination of this communication is strictly prohibited. If you have received
this communication in error, notify the sender immediately by return email and delete the
message and any attachments from your system.

Mime
View raw message