How to map DataSet row to Struct in java?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to map DataSet row to Struct in java?

anuragDada
When i try to use this below code getting this error : Exception in thread "main" org.apache.spark.sql.AnalysisException: Generators are not supported when it's nested in expressions, but got: generatorouter(explode(generatorouter(explode(json)))); StructType schema = DataTypes.createStructType( new StructField[] { DataTypes.createStructField("AckedState", DataTypes.StringType, true), DataTypes.createStructField("ConfirmedState", DataTypes.StringType, true), DataTypes.createStructField("Time", DataTypes.DoubleType, true), DataTypes.createStructField("ActiveState", DataTypes.StringType, true), DataTypes.createStructField("SourceNode", DataTypes.StringType, true), DataTypes.createStructField("SourceName", DataTypes.StringType, true), DataTypes.createStructField("Message", DataTypes.StringType, true), DataTypes.createStructField("Severity", DataTypes.IntegerType, true) }); Dataset df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", brokers) .option("kafka.ssl.keystore.location", keystore).option("kafka.security.protocol", "SASL_SSL") .option("kafka.ssl.keystore.password", "abc").option("kafka.ssl.truststore.location", truststore) .option("kafka.ssl.truststore.password", "abc").option("kafka.sasl.jaas.config", jaasCfg) .option("kafka.sasl.mechanism", "PLAIN").option("group.id", consumerGroupId) .option("value.serializer", serializer).option("key.serializer", serializer) .option("session.timeout.ms", "30000").option("subscribe", "alerts5").load() .selectExpr("CAST(value AS STRING) as json"); df.printSchema(); Column col = new Column("json"); Dataset data = df.select(functions.from_json(col, schema).as("json")); data.printSchema(); Dataset results = data.select(functions.explode_outer(functions.explode_outer(new Column("json")))); results.printSchema(); results.show(); results.writeStream().format("console").option("truncate", false).start().awaitTermination(); i am using function.from_json

Sent from the Apache Spark User List mailing list archive at Nabble.com.