How to use Dataset<Row> forEachPartion and groupByKey together

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to use Dataset<Row> forEachPartion and groupByKey together

Robin Kuttaiah
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--------------------------------------
   Dataset<Row> kafkaEvents = getSparkSession().readStream().format("kafka")
          .option("kafka.bootstrap.servers", strKafkaAddress)
          .option("assign", strSubscription)
          .option("maxOffsetsPerTrigger", "100000")
          .option("startingOffsets", "latest")
          .option("failOnDataLoss", false)
          .load()
          .filter(strFilter)
          .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events"))
          .select("events.*");
          
I do groupByKey and then for each group, use those set of events obtained per group, create JDBC connection/preparedStatement, insert and then close connection. 
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
---------------------------------
    Dataset<InsightEventUpdate>  sessionUpdates = kafkaEvents
       .groupByKey(
          new MapFunction<Row, String>() {
    @Override public String call(Row event) {
      return event.getAs(m_InsightRawEvent.getPrimaryKey());
    }
      }, Encoders.STRING())
      .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each group.

I need to do it for each partition so that only one connection and one bacth insert can be done for all the new events which is read from the partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState 

thanks
Robin Kuttaiah