spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Spark dataset to explode json string
Date Fri, 19 Jul 2019 22:57:06 GMT
Sure.

Do you have an example of a record from Cassandra read into df by any
chance? Only columns that need to go into Oracle.

df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)


HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 19 Jul 2019 at 23:17, Richard <fifistorm123@gmail.com> wrote:

> Thanks for the reply,
> my situation is little different than your sample:
> Following is the schema from source (df.printSchema();)
>
> root
>  |-- id: string (nullable = true)
>  |-- col1: string (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- jsonCol: string (nullable = true)
>
> I want extract multiple fields from jsonCol to schema to be
> root
>  |-- id: string (nullable = true)
>  |-- col1: string (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- jsonCol: string (nullable = true)
>  |-- foo: string (nullable = true)
>  |-- bar: string (nullable = true)
> ...
> Thanks,
>
>
>
> On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> Hi Richard,
>>
>> You can use the following to read JSON data into DF. The example is
>> reading JSON from Kafka topic
>>
>>           val sc = spark.sparkContext
>>          import spark.implicits._
>>          // Use map to create the new RDD using the value portion of the
>> pair.
>>          val jsonRDD = pricesRDD.map(x => x._2)
>>          // Create DataFrame from jsonRDD
>>          val jsonDF = sqlContext.read.json(jsonRDD)
>>
>> This is an example of reading a MongoDB document into Spark
>>
>> dfrddMongoDB.printSchema
>> /*
>> root
>>  |-- _id: struct (nullable = true)
>>  |    |-- oid: string (nullable = true)
>>  |-- operation: struct (nullable = true)
>>  |    |-- op_type: integer (nullable = true)
>>  |    |-- op_time: string (nullable = true)
>>  |-- priceInfo: struct (nullable = true)
>>  |    |-- key: string (nullable = true)
>>  |    |-- ticker: string (nullable = true)
>>  |    |-- timeissued: string (nullable = true)
>>  |    |-- price: double (nullable = true)
>>  |    |-- currency: string (nullable = true)
>> // one example of mongo document from mongo collection
>> {
>>     "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
>>     "priceInfo" : {
>>         "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
>>         "ticker" : "ORCL",
>>         "timeissued" : "2019-04-10T21:20:57",
>>         "price" : 41.13,
>>         "currency" : "GBP"
>>     },
>>     "operation" : {
>>         "op_type" : NumberInt(1),
>>         "op_time" : "1554927506012"
>>     }
>> }
>> */
>> // Flatten the structs
>> val df = dfrddMongoDB.
>>                select(
>>                         'priceInfo.getItem("key").as("key")
>>                       , 'priceInfo.getItem("ticker").as("ticker")
>>                       , 'priceInfo.getItem("timeissued").as("timeissued")
>>                       , 'priceInfo.getItem("price").as("price")
>>                       , 'priceInfo.getItem("currency").as("currency")
>>                       , 'operation.getItem("op_type").as("op_type")
>>                       , 'operation.getItem("op_time").as("op_time")
>>                      )
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 19 Jul 2019 at 21:48, Richard <fifistorm123@gmail.com> wrote:
>>
>>> let's say I use spark to migrate some data from Cassandra table to
>>> Oracle table
>>> Cassandra Table:
>>> CREATE TABLE SOURCE(
>>> id UUID PRIMARY KEY,
>>> col1 text,
>>> col2 text,
>>> jsonCol text
>>> );
>>> example jsonCol value: {"foo": "val1", "bar", "val2"}
>>>
>>> I am trying to extract fields from the json column while importing to
>>> Oracle table
>>> Destination (
>>> id varchar2(50),
>>> col1 varchar(128).
>>> col2 varchar(128)
>>> raw_json clob,
>>> foo varchar2(256),
>>> bar varchar2(256)
>>> );
>>>
>>> What I have done:
>>> separate udf for foo and bar.
>>> This approach works, but that also means I need to deserialize raw json
>>> to json object twice, things getting worse if i want to extract many fields
>>> from the json.
>>> example:
>>> df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
>>>      .withColumn("bar", getBar.apply(col("jsonCol")));
>>> // getFoo and getBar are UserDefinedFunction
>>>
>>> how do I parse raw json string only once and explode fields I need to
>>> multiple columns into Oracle in spark?
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>>
>>>

Mime
View raw message