Source.getBatch and schema vs qe.analyzed.schema?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Source.getBatch and schema vs qe.analyzed.schema?

Jacek Laskowski
Hi,

I've been developing a data source with a source and sink for Spark Structured Streaming.

I've got a question about Source.getBatch [1]:

    def getBatch(start: Option[Offset], end: Offset): DataFrame

getBatch returns a streaming DataFrame between the offsets so the idiom (?) is to have a code as follows:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)
    new Dataset[Row](sparkSession, plan, RowEncoder(schema))

Note the use of schema [2] that is another part of the Source abstraction:

    def schema: StructType
    
This is the "source" of my question. Is the above OK in a streaming sink / Source.getBatch?

Since there are no interim operators that could change attributes (schema) I think it's OK.

I've seen the following code and that made me wonder whether it's better or not compared to the solution above:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)

    // When would we have to execute plan?
    val qe = sparkSession.sessionState.executePlan(plan)
    new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

When would or do we have to use qe.analyzed.schema vs simply schema? Could this qe.analyzed.schema help avoid some edge cases and is a preferred approach?

Thank you for any help you can offer. Much appreciated.

Reply | Threaded
Open this post in threaded view
|

Re: Source.getBatch and schema vs qe.analyzed.schema?

Bartosz Konieczny
Hi Jacek,

An interesting question! I don't know the exact answer and will be happy to learn by the way :) Below you can find my understanding for these 2 things, hoping it helps a little.

For me, we can distinguish 2 different source categories. The first of them is a source with some fixed schema. A good example is Apache Kafka which exposes the topic name, key, value and you can't change that; it's always the same, whenever you run the reader in Company A or in Company B. What changes is the data extraction logic from the key, value or headers. But it's business-specific, not data store-specific. You can find the schema implementation here: Kafka

The second type is a source with user-defined schema, like a RDBMS table or a NoSQL schemaless store. Here, predicting the schema will not only be business-specific, but also data store-specific; you can set any name for a Primary Key column, there is no such rule like "key" or "value" in Kafka. To avoid runtime errors (= favor fail-fast approach before the data is read), Spark can use the metadata to assert (analyze) the schema specified by the user to confirm it or fail fast before reading the data.

Best,
Bartosz.



On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

I've been developing a data source with a source and sink for Spark Structured Streaming.

I've got a question about Source.getBatch [1]:

    def getBatch(start: Option[Offset], end: Offset): DataFrame

getBatch returns a streaming DataFrame between the offsets so the idiom (?) is to have a code as follows:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)
    new Dataset[Row](sparkSession, plan, RowEncoder(schema))

Note the use of schema [2] that is another part of the Source abstraction:

    def schema: StructType
    
This is the "source" of my question. Is the above OK in a streaming sink / Source.getBatch?

Since there are no interim operators that could change attributes (schema) I think it's OK.

I've seen the following code and that made me wonder whether it's better or not compared to the solution above:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)

    // When would we have to execute plan?
    val qe = sparkSession.sessionState.executePlan(plan)
    new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

When would or do we have to use qe.analyzed.schema vs simply schema? Could this qe.analyzed.schema help avoid some edge cases and is a preferred approach?

Thank you for any help you can offer. Much appreciated.



--
Reply | Threaded
Open this post in threaded view
|

Re: Source.getBatch and schema vs qe.analyzed.schema?

Jacek Laskowski
Hi Bartosz,

This is not a question about whether the data source supports fixed or user-defined schema but what schema to use when requested for a streaming batch in Source.getBatch.

On Wed, Mar 31, 2021 at 7:44 PM Bartosz Konieczny <[hidden email]> wrote:
Hi Jacek,

An interesting question! I don't know the exact answer and will be happy to learn by the way :) Below you can find my understanding for these 2 things, hoping it helps a little.

For me, we can distinguish 2 different source categories. The first of them is a source with some fixed schema. A good example is Apache Kafka which exposes the topic name, key, value and you can't change that; it's always the same, whenever you run the reader in Company A or in Company B. What changes is the data extraction logic from the key, value or headers. But it's business-specific, not data store-specific. You can find the schema implementation here: Kafka

The second type is a source with user-defined schema, like a RDBMS table or a NoSQL schemaless store. Here, predicting the schema will not only be business-specific, but also data store-specific; you can set any name for a Primary Key column, there is no such rule like "key" or "value" in Kafka. To avoid runtime errors (= favor fail-fast approach before the data is read), Spark can use the metadata to assert (analyze) the schema specified by the user to confirm it or fail fast before reading the data.

Best,
Bartosz.



On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

I've been developing a data source with a source and sink for Spark Structured Streaming.

I've got a question about Source.getBatch [1]:

    def getBatch(start: Option[Offset], end: Offset): DataFrame

getBatch returns a streaming DataFrame between the offsets so the idiom (?) is to have a code as follows:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)
    new Dataset[Row](sparkSession, plan, RowEncoder(schema))

Note the use of schema [2] that is another part of the Source abstraction:

    def schema: StructType
    
This is the "source" of my question. Is the above OK in a streaming sink / Source.getBatch?

Since there are no interim operators that could change attributes (schema) I think it's OK.

I've seen the following code and that made me wonder whether it's better or not compared to the solution above:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)

    // When would we have to execute plan?
    val qe = sparkSession.sessionState.executePlan(plan)
    new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

When would or do we have to use qe.analyzed.schema vs simply schema? Could this qe.analyzed.schema help avoid some edge cases and is a preferred approach?

Thank you for any help you can offer. Much appreciated.



--