spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Richard <fifistorm...@gmail.com>
Subject Re: Spark dataset to explode json string
Date Sat, 20 Jul 2019 04:20:58 GMT
ok, thanks,
I have another way that is currently working but not efficient if I have to
extract lot of fields
that is creating udf for each extraction:
df = df.withColumn("foo", getfoo.apply(col("jsonCol")))
.withColumn("bar", getbar.apply(col("jsonCol")));






On Fri, Jul 19, 2019 at 8:54 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> You can try to split the {"foo": "val1", "bar": "val2"} as below.
>
>
> /*
> This is an example of output!
> (c1003d93-5157-4092-86cf-0607157291d8,{"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
> "timeissued":"2019-07-01T09:10:55", "price":395.25})
> {"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
> "timeissued":"2019-07-01T09:10:55", "price":395.25}
> */
> // Then I do this to get individual values
>
>            var rowkey =
> row._2.split(',').view(0).split(':').view(1).toString.drop(1).dropRight(1).trim
>            var ticker = row._2.split(',').view(1).
> split(':').view(1).toString.drop(1).dropRight(1).trim
>            var timeissued = row._2.split(',').view(2).
> toString.substring(14,35).drop(1).dropRight(1).trim
>            var price =
> row._2.split(',').view(3).split(':').view(1).toString.dropRight(1).toDouble
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Jul 2019 at 00:00, Richard <fifistorm123@gmail.com> wrote:
>
>> example of jsonCol (String):
>> {"foo": "val1", "bar": "val2"}
>>
>> Thanks,
>>
>> On Fri, Jul 19, 2019 at 3:57 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Sure.
>>>
>>> Do you have an example of a record from Cassandra read into df by any
>>> chance? Only columns that need to go into Oracle.
>>>
>>> df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 19 Jul 2019 at 23:17, Richard <fifistorm123@gmail.com> wrote:
>>>
>>>> Thanks for the reply,
>>>> my situation is little different than your sample:
>>>> Following is the schema from source (df.printSchema();)
>>>>
>>>> root
>>>>  |-- id: string (nullable = true)
>>>>  |-- col1: string (nullable = true)
>>>>  |-- col2: string (nullable = true)
>>>>  |-- jsonCol: string (nullable = true)
>>>>
>>>> I want extract multiple fields from jsonCol to schema to be
>>>> root
>>>>  |-- id: string (nullable = true)
>>>>  |-- col1: string (nullable = true)
>>>>  |-- col2: string (nullable = true)
>>>>  |-- jsonCol: string (nullable = true)
>>>>  |-- foo: string (nullable = true)
>>>>  |-- bar: string (nullable = true)
>>>> ...
>>>> Thanks,
>>>>
>>>>
>>>>
>>>> On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi Richard,
>>>>>
>>>>> You can use the following to read JSON data into DF. The example is
>>>>> reading JSON from Kafka topic
>>>>>
>>>>>           val sc = spark.sparkContext
>>>>>          import spark.implicits._
>>>>>          // Use map to create the new RDD using the value portion of
>>>>> the pair.
>>>>>          val jsonRDD = pricesRDD.map(x => x._2)
>>>>>          // Create DataFrame from jsonRDD
>>>>>          val jsonDF = sqlContext.read.json(jsonRDD)
>>>>>
>>>>> This is an example of reading a MongoDB document into Spark
>>>>>
>>>>> dfrddMongoDB.printSchema
>>>>> /*
>>>>> root
>>>>>  |-- _id: struct (nullable = true)
>>>>>  |    |-- oid: string (nullable = true)
>>>>>  |-- operation: struct (nullable = true)
>>>>>  |    |-- op_type: integer (nullable = true)
>>>>>  |    |-- op_time: string (nullable = true)
>>>>>  |-- priceInfo: struct (nullable = true)
>>>>>  |    |-- key: string (nullable = true)
>>>>>  |    |-- ticker: string (nullable = true)
>>>>>  |    |-- timeissued: string (nullable = true)
>>>>>  |    |-- price: double (nullable = true)
>>>>>  |    |-- currency: string (nullable = true)
>>>>> // one example of mongo document from mongo collection
>>>>> {
>>>>>     "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
>>>>>     "priceInfo" : {
>>>>>         "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
>>>>>         "ticker" : "ORCL",
>>>>>         "timeissued" : "2019-04-10T21:20:57",
>>>>>         "price" : 41.13,
>>>>>         "currency" : "GBP"
>>>>>     },
>>>>>     "operation" : {
>>>>>         "op_type" : NumberInt(1),
>>>>>         "op_time" : "1554927506012"
>>>>>     }
>>>>> }
>>>>> */
>>>>> // Flatten the structs
>>>>> val df = dfrddMongoDB.
>>>>>                select(
>>>>>                         'priceInfo.getItem("key").as("key")
>>>>>                       , 'priceInfo.getItem("ticker").as("ticker")
>>>>>                       ,
>>>>> 'priceInfo.getItem("timeissued").as("timeissued")
>>>>>                       , 'priceInfo.getItem("price").as("price")
>>>>>                       , 'priceInfo.getItem("currency").as("currency")
>>>>>                       , 'operation.getItem("op_type").as("op_type")
>>>>>                       , 'operation.getItem("op_time").as("op_time")
>>>>>                      )
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 19 Jul 2019 at 21:48, Richard <fifistorm123@gmail.com>
wrote:
>>>>>
>>>>>> let's say I use spark to migrate some data from Cassandra table to
>>>>>> Oracle table
>>>>>> Cassandra Table:
>>>>>> CREATE TABLE SOURCE(
>>>>>> id UUID PRIMARY KEY,
>>>>>> col1 text,
>>>>>> col2 text,
>>>>>> jsonCol text
>>>>>> );
>>>>>> example jsonCol value: {"foo": "val1", "bar", "val2"}
>>>>>>
>>>>>> I am trying to extract fields from the json column while importing
to
>>>>>> Oracle table
>>>>>> Destination (
>>>>>> id varchar2(50),
>>>>>> col1 varchar(128).
>>>>>> col2 varchar(128)
>>>>>> raw_json clob,
>>>>>> foo varchar2(256),
>>>>>> bar varchar2(256)
>>>>>> );
>>>>>>
>>>>>> What I have done:
>>>>>> separate udf for foo and bar.
>>>>>> This approach works, but that also means I need to deserialize raw
>>>>>> json to json object twice, things getting worse if i want to extract
many
>>>>>> fields from the json.
>>>>>> example:
>>>>>> df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
>>>>>>      .withColumn("bar", getBar.apply(col("jsonCol")));
>>>>>> // getFoo and getBar are UserDefinedFunction
>>>>>>
>>>>>> how do I parse raw json string only once and explode fields I need
to
>>>>>> multiple columns into Oracle in spark?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Mime
View raw message