Nullpointerexception error when in repartition

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Nullpointerexception error when in repartition

Junfeng Chen
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs.
Here is my code:
JavaInputDstream stream = ...
JavaDstream rdd = stream.map...
rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> stringjavardd->{
    Dataset<Row> df = spark.read().json( stringjavardd ); // convert json to df
    JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
    StructType type = df.schema()...; // constuct new type for new added fields
    Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create new dataframe
    newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); // save to parquet
})


However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line:

Java.lang.NullPointerException
 at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
...

While it looks make no sense, writing parquet operation should be in different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen
Reply | Threaded
Open this post in threaded view
|

Re: Nullpointerexception error when in repartition

Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Streaming, which is designed to do this sort of processing. It will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <[hidden email]> wrote:
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs.
Here is my code:
JavaInputDstream stream = ...
JavaDstream rdd = stream.map...
rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> stringjavardd->{
    Dataset<Row> df = spark.read().json( stringjavardd ); // convert json to df
    JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
    StructType type = df.schema()...; // constuct new type for new added fields
    Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create new dataframe
    newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); // save to parquet
})


However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line:

Java.lang.NullPointerException
 at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
...

While it looks make no sense, writing parquet operation should be in different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen

Reply | Threaded
Open this post in threaded view
|

Re: Nullpointerexception error when in repartition

Junfeng Chen
Hi, Tathagata

I have tried structured streaming, but in line
Dataset<Row> rowDataset = spark.read().json(jsondataset);

Always throw 
Queries with streaming sources must be executed with writeStream.start()

But what i need to do in this step is only transforming json string data to Dataset . How to fix it? 

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <[hidden email]> wrote:
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Streaming, which is designed to do this sort of processing. It will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <[hidden email]> wrote:
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs.
Here is my code:
JavaInputDstream stream = ...
JavaDstream rdd = stream.map...
rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> stringjavardd->{
    Dataset<Row> df = spark.read().json( stringjavardd ); // convert json to df
    JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
    StructType type = df.schema()...; // constuct new type for new added fields
    Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create new dataframe
    newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); // save to parquet
})


However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line:

Java.lang.NullPointerException
 at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
...

While it looks make no sense, writing parquet operation should be in different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen


Reply | Threaded
Open this post in threaded view
|

Re: Nullpointerexception error when in repartition

Tathagata Das
Have you read through the documentation of Structured Streaming?

One of the basic mistakes you are making is defining the dataset as with `spark.read()`. You define a streaming Dataset as `spark.readStream()` 

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <[hidden email]> wrote:
Hi, Tathagata

I have tried structured streaming, but in line
Dataset<Row> rowDataset = spark.read().json(jsondataset);

Always throw 
Queries with streaming sources must be executed with writeStream.start()

But what i need to do in this step is only transforming json string data to Dataset . How to fix it? 

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <[hidden email]> wrote:
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Streaming, which is designed to do this sort of processing. It will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <[hidden email]> wrote:
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs.
Here is my code:
JavaInputDstream stream = ...
JavaDstream rdd = stream.map...
rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> stringjavardd->{
    Dataset<Row> df = spark.read().json( stringjavardd ); // convert json to df
    JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
    StructType type = df.schema()...; // constuct new type for new added fields
    Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create new dataframe
    newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); // save to parquet
})


However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line:

Java.lang.NullPointerException
 at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
...

While it looks make no sense, writing parquet operation should be in different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen



Reply | Threaded
Open this post in threaded view
|

Re: Nullpointerexception error when in repartition

Junfeng Chen
Hi, 
I know it, but my purpose it to transforming json string in DataSet<String> to Dataset<Row>, while spark.readStream can only support read json file in specified path. 
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming  gives an essential method, but the formats of every json data are not same.
Either Spark java api seems not supporting grammer like 
.select(from_json($"value", colourSchema))


Regard,
Junfeng Chen

On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <[hidden email]> wrote:
Have you read through the documentation of Structured Streaming?

One of the basic mistakes you are making is defining the dataset as with `spark.read()`. You define a streaming Dataset as `spark.readStream()` 

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <[hidden email]> wrote:
Hi, Tathagata

I have tried structured streaming, but in line
Dataset<Row> rowDataset = spark.read().json(jsondataset);

Always throw 
Queries with streaming sources must be executed with writeStream.start()

But what i need to do in this step is only transforming json string data to Dataset . How to fix it? 

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <[hidden email]> wrote:
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Streaming, which is designed to do this sort of processing. It will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <[hidden email]> wrote:
I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs.
Here is my code:
JavaInputDstream stream = ...
JavaDstream rdd = stream.map...
rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> stringjavardd->{
    Dataset<Row> df = spark.read().json( stringjavardd ); // convert json to df
    JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
    StructType type = df.schema()...; // constuct new type for new added fields
    Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create new dataframe
    newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); // save to parquet
})


However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line:

Java.lang.NullPointerException
 at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
...

While it looks make no sense, writing parquet operation should be in different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen