spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: Queries with streaming sources must be executed with writeStream.start()
Date Tue, 12 Sep 2017 21:26:45 GMT
I have about 100 fields in my dataset and some of them have "null" in it.
Does to_json fails to convert if that is the case?

Thanks!

On Tue, Sep 12, 2017 at 12:32 PM, kant kodali <kanth909@gmail.com> wrote:

> Hi Michael,
>
> Interestingly that doesn't seem to quite work for me for some reason. Here
> is what I have
>
> Datset
>
> name | id | country
> -------------------------
> kant   | 1  | usa
> john   | 2  | usa
>
>
> And here is my code
>
> Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
> StreamingQuery query = ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query.awaitTermination();
>
> *This works completely fine and I can see the rows on my console.*
>
> Now if I change it to this.
>
> Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
> Dataset<String> jsonDS = ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING());
> StreamingQuery query2 = jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query2.awaitTermination();
>
> *I dont see any rows on my console and I made sure I waited for a while.*
>
> *The moment I change it back to above code and run it works again.*
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <michael@databricks.com>
> wrote:
>
>> The following will convert the whole row to JSON.
>>
>> import org.apache.spark.sql.functions.*
>> df.select(to_json(struct(col("*"))))
>>
>> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to
>>> convert Row to Json string?
>>>
>>> Thanks
>>>
>>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> It's because "toJSON" doesn't support Structured Streaming. The current
>>>> implementation will convert the Dataset to an RDD, which is not supported
>>>> by streaming queries.
>>>>
>>>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <kanth909@gmail.com> wrote:
>>>>
>>>>> yes it is a streaming dataset. so what is the problem with following
>>>>> code?
>>>>>
>>>>> Dataset<String> ds = dataset.toJSON().map(()->{some function
that returns a string});
>>>>>  StreamingQuery query = ds.writeStream().start();
>>>>>  query.awaitTermination();
>>>>>
>>>>>
>>>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <
>>>>> felixcheung_m@hotmail.com> wrote:
>>>>>
>>>>>> What is newDS?
>>>>>> If it is a Streaming Dataset/DataFrame (since you have writeStream
>>>>>> there) then there seems to be an issue preventing toJSON to work.
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* kant kodali <kanth909@gmail.com>
>>>>>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>>>>>> *To:* user @spark
>>>>>> *Subject:* Queries with streaming sources must be executed with
>>>>>> writeStream.start()
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I  have the following code and I am not sure what's wrong with it?
I
>>>>>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using
spark
>>>>>> 2.2.0 so I am wondering if there is any work around?
>>>>>>
>>>>>>  Dataset<String> ds = newDS.toJSON().map(()->{some function
that returns a string});
>>>>>>  StreamingQuery query = ds.writeStream().start();
>>>>>>  query.awaitTermination();
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message