Recreate Dataset<Row> from list of Row in spark streaming application.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Recreate Dataset<Row> from list of Row in spark streaming application.

Robin Kuttaiah
Hello,

I have a spark streaming application which reads from Kafka based on the given schema.

Dataset<Row>  m_oKafkaEvents = getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)
            .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have List of rows.

I tried 

sparkSession.createDataFrame(listOfRows, schema);

this gives me 

ava.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah




Reply | Threaded
Open this post in threaded view
|

Re: Recreate Dataset<Row> from list of Row in spark streaming application.

Shixiong(Ryan) Zhu
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is a ForeachWriter. Right? You can not use SparkSession in its process method as it will run in executors.

Best Regards,

Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <[hidden email]> wrote:
Hello,

I have a spark streaming application which reads from Kafka based on the given schema.

Dataset<Row>  m_oKafkaEvents = getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)
            .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have List of rows.

I tried 

sparkSession.createDataFrame(listOfRows, schema);

this gives me 

ava.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah




Reply | Threaded
Open this post in threaded view
|

Re: Recreate Dataset<Row> from list of Row in spark streaming application.

Robin Kuttaiah
Thanks Ryan.

Yes. That is ForeachWriter. 

Can someone help me on how to solve this?.

Basically the output of  flatMapGroupsWithState   function is Dataset<InsightEventUpdate> sessionUpdates ;

InsightEventUpdate  class contains list of Spark Row which I need to convert back to Dataset<Row>.

Something like

Dataset<Row>  correlatedEvents =  <do something here with sessionUpdates>;.

Please note this is a streaming application.

regards,
Robin.




On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu <[hidden email]> wrote:
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is a ForeachWriter. Right? You can not use SparkSession in its process method as it will run in executors.

Best Regards,

Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <[hidden email]> wrote:
Hello,

I have a spark streaming application which reads from Kafka based on the given schema.

Dataset<Row>  m_oKafkaEvents = getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)
            .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have List of rows.

I tried 

sparkSession.createDataFrame(listOfRows, schema);

this gives me 

ava.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah