StructuredStreaming : StreamingQueryException

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

StructuredStreaming : StreamingQueryException

aravias
This post has NOT been accepted by the mailing list yet.
This post was updated on .
hi,
I have one read stream to consume data from a Kafka topic , and based on an attribute value in each of the incoming messages, I have to write data to either of the 2 different locations in S3 (if value1 write to location1, otherwise to location2).  
On a high level below is what I have for doing that,


Dataset<Row> kafkaStreamSet = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrap)
            .option("subscribe", kafkaTopic)
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .option("maxOffsetsPerTrigger", offsetsPerTrigger)
            .load();

        //raw message to ClickStream
        Dataset<ClickStream> ds1 = kafkaStreamSet.mapPartitions(processClickStreamMessages, Encoders.bean(ClickStream.class));  

ClickStream.java has 2 child objects  within it  and only one of them will be populated at a time depending on if the message attribute value is  either value1 or value2,  

 1) BookingRequest.java if  value1,  
 2) PropertyPageView.java if value2 ,

which I then separate  out as below from clickstream to write to 2 diff locations in S3,

        //fetch BookingRequests in the ClickStream
        Dataset<BookingRequest> ds2 = ds1.map(filterBookingRequests,Encoders.bean(BookingRequest.class));

        //fetch PropertyPageViews in the ClickStream
        Dataset<PropertyPageView> ds3 = ds1.map(filterPropertyPageViews,Encoders.bean(PropertyPageView.class));


finally  ds2 and ds3 are written to 2 different locations ,


       StreamingQuery bookingRequestsParquetStreamWriter = ds2.writeStream().outputMode("append")
            .format("parquet")
            .trigger(ProcessingTime.create(bookingRequestProcessingTime, TimeUnit.MILLISECONDS))
            .option("checkpointLocation",  "s3://" + s3Bucket+ "/checkpoint/bookingRequests")
            .partitionBy("eventDate")
            .start("s3://" + s3Bucket+ "/" +  bookingRequestPath);



        StreamingQuery PageViewsParquetStreamWriter = ds3.writeStream().outputMode("append")
            .format("parquet")
            .trigger(ProcessingTime.create(pageViewProcessingTime, TimeUnit.MILLISECONDS))
            .option("checkpointLocation",  "s3://" + s3Bucket+ "/checkpoint/PageViews")
            .partitionBy("eventDate")
            .start("s3://" + s3Bucket+ "/" +  pageViewPath);

        bookingRequestsParquetStreamWriter.awaitTermination();

        PageViewsParquetStreamWriter.awaitTermination();



it seems to work  fine and I see data written to different paths when the apps deployed. But, whenever the job is restarted on failure or on manual stops and starts, it keeps failing with below exception (where userSessionEventJoin.global is my topic name),


Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"userSessionEventJoin.global":{"92":154362528,"101 org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74)
        org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59)
        org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134)
        org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123)
        org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237)
        org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138)
        org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:121)
        org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:216)
        org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:452)
        org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:448)



if I delete all the checkpointing information, then it starts again and starts new checkpointing in the given 2 locations, but that means I have to start processing from the latest offset again and lose all previous offsets.
The spark version is 2.1.  Please suggest any resolutions, thanks.
Reply | Threaded
Open this post in threaded view
|

Re: StructuredStreaming : StreamingQueryException

aravias
This post has NOT been accepted by the mailing list yet.
this is a bug in spark version 2.1.0, seems to be fixed in spark 2.1.1 when ran  with that version.
Reply | Threaded
Open this post in threaded view
|

Re: StructuredStreaming : StreamingQueryException

aravias
This post has NOT been accepted by the mailing list yet.
the bug is related to where long checkpoints are truncated when dealing with topics have large number of partitions, in my case 120.