Redeploying spark streaming application aborts because of checkpoint issue

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Redeploying spark streaming application aborts because of checkpoint issue

Robin Kuttaiah
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client.

Read events from Kafka as shown below;

        m_oKafkaEvents = getSparkSession().readStream().format("kafka")
          .option("kafka.bootstrap.servers", strKafkaAddress)
          .option("assign", strSubscription)
          .option("maxOffsetsPerTrigger", "100000")
          .option("startingOffsets", "latest")
          .option("failOnDataLoss", false)
          .load()
          .filter(strFilter)
          .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events"))
          .select("events.*");

Checkpoint is used as shown below;

DataStreamWriter<Row> oMilestoneStream = oAggregation
      .writeStream()
      .queryName(strQueryName)
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval()))
      .option("checkpointLocation", strCheckpointLocation)
      .foreach(oForeachWriter);

strCheckpointLocation is something like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj. This is hdfs location. 


With this when I redeploy the spark I get below said exception. The only work around I have currently is to delete the checkpoint location and recreate the topic.

I also see couple of JIRA tasks which says RESOLVED but the problem still seen.

Can someone help me on what is the best solution for this?

thanks,
Robin Kuttaiah



Exception
-------
18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalStateException: Error reading delta file hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta of HDFSStateStoreProvider[id = (op=1,part=0),dir = hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]: hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta does not exist
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196)
        at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369)
        at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
Caused by: java.io.FileNotFoundException: File does not exist: /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1836)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1808)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1723)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)