spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kuttaiah Robin <kutta...@gmail.com>
Subject Recreate Dataset<Row> from list of Row in spark streaming application.
Date Fri, 05 Oct 2018 13:22:44 GMT
Hello,

I have a spark streaming application which reads from Kafka based on the
given schema.

Dataset<Row>  m_oKafkaEvents =
getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the
key for us and then fed into flatMapGroupsWithState function. This function
does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of
Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have
List of rows.

I tried

sparkSession.createDataFrame(listOfRows, schema);

this gives me

ava.lang.NullPointerException
        at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah

Mime
View raw message