spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kuttaiah Robin <kutta...@gmail.com>
Subject How to use Dataset<Row> forEachPartion and groupByKey together
Date Thu, 01 Nov 2018 06:15:44 GMT
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--------------------------------------
   Dataset<Row> kafkaEvents = getSparkSession().readStream().format("kafka")
          .option("kafka.bootstrap.servers", strKafkaAddress)
          .option("assign", strSubscription)
          .option("maxOffsetsPerTrigger", "100000")
          .option("startingOffsets", "latest")
          .option("failOnDataLoss", false)
          .load()
          .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
          .select("events.*");

I do groupByKey and then for each group, use those set of events obtained
per group, create JDBC connection/preparedStatement, insert and then close
connection.
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
---------------------------------
    Dataset<InsightEventUpdate>  sessionUpdates = kafkaEvents
       .groupByKey(
          new MapFunction<Row, String>() {
    @Override public String call(Row event) {
      return event.getAs(m_InsightRawEvent.getPrimaryKey());
    }
      }, Encoders.STRING())
      .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each
group.

I need to do it for each partition so that only one connection and one
bacth insert can be done for all the new events which is read from the
partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState

thanks
Robin Kuttaiah

Mime
View raw message