Corrupt record handling in spark structured streaming and from_json function

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Corrupt record handling in spark structured streaming and from_json function

Colin Williams-2
Hi,

I'm trying to figure out how I can write records that don't match a
json read schema via spark structred streaming to an output sink /
parquet location. Previously I did this in batch via corrupt column
features of batch. But in this spark structured streaming I'm reading
from kafka a string and using from_json on the value of that string.
If it doesn't match my schema then I from_json returns null for all
the rows, and does not populate a corrupt record column. But I want to
somehow obtain the source kafka string in a dataframe, and an write to
a output sink / parquet location.

def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
  val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
  jsonDataFrame.select(from_json(col("value"),
schema)).select("jsontostructs(value).*")
}

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Corrupt record handling in spark structured streaming and from_json function

Colin Williams-2
From my initial impression it looks like I'd need to create my own
`from_json` using `jsonToStructs` as a reference but try to handle `
case : BadRecordException => null ` or similar to try to write the non
matching string to a corrupt records column

On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
<[hidden email]> wrote:

>
> Hi,
>
> I'm trying to figure out how I can write records that don't match a
> json read schema via spark structred streaming to an output sink /
> parquet location. Previously I did this in batch via corrupt column
> features of batch. But in this spark structured streaming I'm reading
> from kafka a string and using from_json on the value of that string.
> If it doesn't match my schema then I from_json returns null for all
> the rows, and does not populate a corrupt record column. But I want to
> somehow obtain the source kafka string in a dataframe, and an write to
> a output sink / parquet location.
>
> def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
>   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
>   jsonDataFrame.select(from_json(col("value"),
> schema)).select("jsontostructs(value).*")
> }

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Corrupt record handling in spark structured streaming and from_json function

Colin Williams-2
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming

On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
<[hidden email]> wrote:

>
> From my initial impression it looks like I'd need to create my own
> `from_json` using `jsonToStructs` as a reference but try to handle `
> case : BadRecordException => null ` or similar to try to write the non
> matching string to a corrupt records column
>
> On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
> <[hidden email]> wrote:
> >
> > Hi,
> >
> > I'm trying to figure out how I can write records that don't match a
> > json read schema via spark structred streaming to an output sink /
> > parquet location. Previously I did this in batch via corrupt column
> > features of batch. But in this spark structured streaming I'm reading
> > from kafka a string and using from_json on the value of that string.
> > If it doesn't match my schema then I from_json returns null for all
> > the rows, and does not populate a corrupt record column. But I want to
> > somehow obtain the source kafka string in a dataframe, and an write to
> > a output sink / parquet location.
> >
> > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
> >   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
> >   jsonDataFrame.select(from_json(col("value"),
> > schema)).select("jsontostructs(value).*")
> > }

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Corrupt record handling in spark structured streaming and from_json function

Colin Williams-2
Dear spark user community,

I have recieved some insight regarding filtering seperate dataframes
in my spark-structured-streaming job. However I wish to write the
dataframes aforementioned above in the stack overflow question each
using a parquet writer to a separate location. My initial impression
is this requires multiple sinks, but I'm being pressured against that.
I think it might also be possible using the for each / for each batch
writers. But I'm not sure regarding parquet writer, and also the
caveats to this approach. Can some more advanced users or developers
suggest how to go about this, particularly without using multiple
streams?


On Wed, Dec 26, 2018 at 6:01 PM Colin Williams
<[hidden email]> wrote:

>
> https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming
>
> On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
> <[hidden email]> wrote:
> >
> > From my initial impression it looks like I'd need to create my own
> > `from_json` using `jsonToStructs` as a reference but try to handle `
> > case : BadRecordException => null ` or similar to try to write the non
> > matching string to a corrupt records column
> >
> > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
> > <[hidden email]> wrote:
> > >
> > > Hi,
> > >
> > > I'm trying to figure out how I can write records that don't match a
> > > json read schema via spark structred streaming to an output sink /
> > > parquet location. Previously I did this in batch via corrupt column
> > > features of batch. But in this spark structured streaming I'm reading
> > > from kafka a string and using from_json on the value of that string.
> > > If it doesn't match my schema then I from_json returns null for all
> > > the rows, and does not populate a corrupt record column. But I want to
> > > somehow obtain the source kafka string in a dataframe, and an write to
> > > a output sink / parquet location.
> > >
> > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
> > >   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
> > >   jsonDataFrame.select(from_json(col("value"),
> > > schema)).select("jsontostructs(value).*")
> > > }

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]