spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandan prakash <>
Subject Re: Automatic Json Schema inference using Structured Streaming
Date Mon, 09 Jul 2018 06:36:51 GMT
Hi Swetha,
I also had the same requirement reading from json from kafka and writing
back to parquet format.
I did a work around :

   1. Inferred the schema using the batch api by reading first few rows
   2. started streaming using the inferred schema in step1

*Limitation*: Will not work if you schema changes on the go for later
records. Will have to restart the streaming.

*Sample Code:*
 //start the stream
    def start = {
        //check and get latest kafka offset from checkpoint if exists
        val startingOffset:String= getKafkaOffset(offsetDirHdfsPath)

        //batch: infer schema from kafka one time during start
        val batchDf="kafka").option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topic)
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest
        val batchJson=
        batchJson.printSchema()//print to see schema
        val schema = batchJson.schema

        //streaming: create datastream from Kafka topics
        val inputDf = spark
            .option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topic)
            // .option("startingOffsets",
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest

        // convert datastream into a datasets and convert the stream into
multiple rows by applying appropriate schema
        val ds= inputDf.selectExpr("CAST (value as STRING)")
        val dataSet =$"value",schema).as("data")).select("data.*")
        var uploadToS3 = dataSet
            .option("path", outputPath)
            .option("checkpointLocation", checkpointDir)


On Thu, Jul 5, 2018 at 12:38 PM SRK <> wrote:

> Hi,
> Is there a way that Automatic Json Schema inference can be done using
> Structured Streaming?  I do not want to supply a predefined schema and bind
> it.
> With Spark Kafka Direct I could do I see that this is
> not
> supported in Structured Streaming.
> Thanks!
> --
> Sent from:
> ---------------------------------------------------------------------
> To unsubscribe e-mail:

Chandan Prakash

View raw message