unable to stream kafka messages

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

unable to stream kafka messages

kaniska
This post has NOT been accepted by the mailing list yet.
Hi,

Currently , encountering the following exception while working with below-mentioned code snippet :

> Please suggest the correct approach for reading the stream into a sql schema.
> If I add 'tweetSchema' while reading stream, it errors out with message - we can not change static schema for kafka.

-------------------------------------------------------------------------------------------

exception

Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`location`' given input columns: [topic, timestamp, key, offset, value, timestampType, partition];
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
--------------------------------------------------------------------------------------------------------

structured streaming code snippet

String bootstrapServers = "localhost:9092";
            String subscribeType = "subscribe";
            String topics = "events";

            StructType tweetSchema = new StructType()
                .add("tweetId", "string")
                .add("tweetText", "string")
                .add("location", "string")
                .add("timestamp", "string");

           SparkSession spark = SparkSession
                              .builder()
                              .appName("StreamProcessor")
                              .config("spark.master", "local")
                              .getOrCreate();

          Dataset<Tweet> streams = spark
                                      .readStream()
                                      .format("kafka")
                                      .option("kafka.bootstrap.servers", bootstrapServers)
                                      .option(subscribeType, topics)
                                      .load()
                                      .as(Encoders.bean(Tweet.class));

         streams.createOrReplaceTempView("streamsData");
                   
                   String sql = "SELECT location,  COUNT(*) as count FROM streamsData GROUP BY location";
                   Dataset<Row> countsByLocation = spark.sql(sql);

                    StreamingQuery query = countsByLocation.writeStream()
                      .outputMode("complete")
                      .format("console")
                      .start();

                    query.awaitTermination();
--------------------------------------------------------------------------------------------------

Tweet

Tweet.java - has public constructor and getter / setter methods

public class Tweet implements Serializable{
       
        private String tweetId;
        private String tweetText;
        private String location;
        private String timestamp;
       
        public Tweet(){
               
        }
.............  

----------------------------------------------------------------------------------------

pom.xml


                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-core_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-sql_2.10</artifactId>
                        <version>2.1.0</version>
                </dependency>
                <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
    <version>2.1.0</version>
                </dependency>
------------------------------------------------------------------------------------
Loading...