spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
Subject Re: Recreate Dataset<Row> from list of Row in spark streaming application.
Date Fri, 05 Oct 2018 17:42:26 GMT
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is
a ForeachWriter. Right? You can not use SparkSession in its process method
as it will run in executors.

Best Regards,
Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <kuttaiah@gmail.com> wrote:

> Hello,
>
> I have a spark streaming application which reads from Kafka based on the
> given schema.
>
> Dataset<Row>  m_oKafkaEvents =
> getSparkSession().readStream().format("kafka")
>             .option("kafka.bootstrap.servers", strKafkaAddress)
>             .option("assign", strSubscription)
>             .option("maxOffsetsPerTrigger", "100000")
>             .option("startingOffsets", "latest")
>             .option("failOnDataLoss", false)
>             .load()
>             .filter(strFilter)
>
> .select(functions.from_json(functions.col("value").cast("string"),
> schema).alias("events"))
>             .select("events.*");
>
>
> Now this dataset is grouped by one of the column(InstanceId) which is the
> key for us and then fed into flatMapGroupsWithState function. This function
> does some correlation.
>
> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
>   new MapFunction<Row, String>() {
>     @Override public String call(Row event) {
>     return event.getAs("InstanceId");
>     }
>   }, Encoders.STRING())
>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>   Encoders.bean(InsightEventInfo.class),
> Encoders.bean(InsightEventUpdate.class),
>   GroupStateTimeout.ProcessingTimeTimeout());
>
>
> The output dataset is of type InsightEventUpdate which contains List of
> Spark Rows which is related to the InstanceId.
>
> Now I want to convert this back into of type Dataset<Row>. Basically I
> have List of rows.
>
> I tried
>
> sparkSession.createDataFrame(listOfRows, schema);
>
> this gives me
>
> ava.lang.NullPointerException
>         at
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>         at
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>         at
> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>         at
> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>
> Can someone help me what is the way to go ahead?
>
> thanks
> Robin Kuttaiah
>
>
>
>
>

Mime
View raw message