java.io.NotSerializableException about SparkStreaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

java.io.NotSerializableException about SparkStreaming

Shengshan Zhang
Hello guys!
java.io.NotSerializableException troubles me a lot when i process data with spark. 
```
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
    // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
    mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )
      json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
    })
```

However it fails cause of java.io.NotSerializableException and follow is error info
17/10/16 18:56:50 ERROR Utils: Exception encountered
        java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)


So i change  my code as follows 
object mytest_config{
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
  }
mydata.foreachRDD( rdd => { val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty) .map(_.get).map(Scan.convertForHbase _ ) json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration) })
And this could work!
Somebody got any ideas why this work , and what is the recommended way officially?







Reply | Threaded
Open this post in threaded view
|

Re: java.io.NotSerializableException about SparkStreaming

高佳翔

Hi Shengshan,

In first code, ‘newAPIJobConfiguration’ is sharing across all rdds. So, it should be serializable.

In second code, each rdd creates a new ‘mytest_config’ object and an individual ‘newAPIJobConfiguration’ instead of sharing the same object. So it can be non-serializable.

If it’s possible, maybe you can try to save the result of mydata.foreachRDD(…) instead of each rdd like

val result = mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )

result.write.save(...)

On Tue, Oct 17, 2017 at 7:00 PM, Shengshan Zhang <[hidden email]> wrote:
Hello guys!
java.io.NotSerializableException troubles me a lot when i process data with spark. 
```
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
    // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
    mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )
      json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
    })
```

However it fails cause of java.io.NotSerializableException and follow is error info
17/10/16 18:56:50 ERROR Utils: Exception encountered
        java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)


So i change  my code as follows 
object mytest_config{
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
  }
mydata.foreachRDD( rdd => { val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty) .map(_.get).map(Scan.convertForHbase _ ) json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration) })
And this could work!
Somebody got any ideas why this work , and what is the recommended way officially?










--
Gao JiaXiang
Data Analyst, GCBI