oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is a ForeachWriter. Right? You can not use SparkSession in its process method as it will run in executors.

Best Regards,

Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <kuttaiah@gmail.com> wrote:
Hello,

I have a spark streaming application which reads from Kafka based on the given schema.

Dataset<Row>  m_oKafkaEvents = getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)
            .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have List of rows.

I tried 

sparkSession.createDataFrame(listOfRows, schema);

this gives me 

ava.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah