Error Handling on calling saveAsHadoopDataset

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Error Handling on calling saveAsHadoopDataset

Kanwaldeep
I'm using Spark Streaming 0.8 that reads data from Kafka does certain aggregates and persists the data in Kafka.

I'm trying to understand what is the best handle failure conditions when we have errors connecting to HBase. Currently the code reads the messages from Kafka, does the aggregates and then fails on writing the DStream to HBase. The data that we received from Kafka is not getting persisted in HBase and gets lost. As HBase is available again we do get the new messages in HBase but we lost the messages in HBase during the outage.

What is the best way to handle this scenario when our target database is unavailable?

Also I'm running into an issue with setting up checkpointing on the context as it is unable to serialize the org.apache.hadoop.mapred.JobConf object

Exception in thread "pool-6-thread-1" java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1359)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1155)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:422)
        at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:152)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
        at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:112)
        at org.apache.spark.streaming.Scheduler.doCheckpoint(Scheduler.scala:127)
        at org.apache.spark.streaming.Scheduler.clearOldMetadata(Scheduler.scala:119)
        at org.apache.spark.streaming.JobManager.org$apache$spark$streaming$JobManager$$clearJob(JobManager.scala:79)
        at org.apache.spark.streaming.JobManager$JobHandler.run(JobManager.scala:41)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
        at java.lang.Thread.run(Thread.java:695)
Reply | Threaded
Open this post in threaded view
|

Re: Error Handling on calling saveAsHadoopDataset

Kanwaldeep
Any help on this would be great.