[Structured Streaming] Avoiding multiple streaming queries

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming] Avoiding multiple streaming queries

Priyank Shrivastava

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank



Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

dcam
Hi Priyank

I have a similar structure, although I am reading from Kafka and sinking to
multiple MySQL tables. My input stream has multiple message types and each
is headed for a different MySQL table.

I've looked for a solution for a few months, and have only come up with two
alternatives:

1. Since I'm already using a ForeachSink, because there is no native MySQL
sink, I could sink each batch to the different tables in one sink. But,
having only one spark job doing all the sinking seems like it will be
confusing, and the sink itself will be fairly complex.

2. The same as your second option: have one job sort through the stream and
persist the sorted stream to HDFS. Read the sorted streams in individual
jobs and sink in to the appropriate tables.

I haven't implemented it yet, but it seems to me that the code for 2 will be
simpler, and operationally things will be clearer. If a job fails, I have a
better understanding of what state it is in.

Reading Manning's Big Data book from Nathan Marz and James Warren has been
influencing how I structure Spark jobs recently. They don't shy away from
persisting intermediate data sets, and I am embracing that right now in my
thinking.

Cheers!
Dave



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Yogesh
In reply to this post by Priyank Shrivastava
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank




Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Tathagata Das
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank





Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

chandan prakash
Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Arun Mahadevan
Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.


From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

chandan prakash
Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.


From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash



--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Arun Mahadevan
What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

Thanks,
Arun

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.


From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash



--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

kant kodali
understand each row has a topic column but can we write one row to multiple topics?

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <[hidden email]> wrote:
What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

Thanks,
Arun

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>

Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.


From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash



--
Chandan Prakash


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Silvio Fiorito

Using the current Kafka sink that supports routing based on topic column, you could just duplicate the rows (e.g. explode rows with different topic, key values). That way you’re only reading and processing the source once and not having to resort to custom sinks, foreachWriter, or multiple queries.

 

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame and let you write to the sink as you wish.

 

From: kant kodali <[hidden email]>
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan <[hidden email]>
Cc: chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

understand each row has a topic column but can we write one row to multiple topics?

 

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <[hidden email]> wrote:

What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

 

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

 

Thanks,

Arun

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Thanks a lot Arun for your response.

I got your point that existing sink plugins like kafka, etc can not be used.

However I could not get the part : " you cannot scale the partitions for the sinks independently "

Can you please rephrase the above part ?

 

Also,

I guess :

using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.

 

 

 

Regards,

Chandan

 

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:

Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

 

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.

 

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Hi,

Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.

This will mean that:

1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.

2.  every record will be read once in the single query but can be written to multiple sinks

 

Do you guys see any drawback in this approach ?

One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.

Apart from that any other issues do you see with this approach?

 

Regards,

Chandan

 

 

On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:

Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.

 

 

Or am i misunderstanding the problem?

 

TD

 

 

 

 

On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:

I had a similar issue and i think that’s where the structured streaming design lacks.

Seems like Question#2 in your email is a viable workaround for you.

 

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 

For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

 

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 

These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 

Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

 

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.

 

 

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.



I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.



Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 



2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?



Thanks in advance for any help.



Priyank

 

 

 

 


 

--

Chandan Prakash


 

--

Chandan Prakash

 

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

kant kodali
@Silvio Thought about duplicating rows but dropped the idea for increasing memory. forEachBatch sounds Interesting!

On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito <[hidden email]> wrote:

Using the current Kafka sink that supports routing based on topic column, you could just duplicate the rows (e.g. explode rows with different topic, key values). That way you’re only reading and processing the source once and not having to resort to custom sinks, foreachWriter, or multiple queries.

 

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame and let you write to the sink as you wish.

 

From: kant kodali <[hidden email]>
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan <[hidden email]>
Cc: chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

understand each row has a topic column but can we write one row to multiple topics?

 

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <[hidden email]> wrote:

What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

 

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

 

Thanks,

Arun

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Thanks a lot Arun for your response.

I got your point that existing sink plugins like kafka, etc can not be used.

However I could not get the part : " you cannot scale the partitions for the sinks independently "

Can you please rephrase the above part ?

 

Also,

I guess :

using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.

 

 

 

Regards,

Chandan

 

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:

Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

 

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.

 

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Hi,

Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.

This will mean that:

1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.

2.  every record will be read once in the single query but can be written to multiple sinks

 

Do you guys see any drawback in this approach ?

One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.

Apart from that any other issues do you see with this approach?

 

Regards,

Chandan

 

 

On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:

Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.

 

 

Or am i misunderstanding the problem?

 

TD

 

 

 

 

On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:

I had a similar issue and i think that’s where the structured streaming design lacks.

Seems like Question#2 in your email is a viable workaround for you.

 

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 

For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

 

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 

These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 

Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

 

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.

 

 

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.



I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.



Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 



2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?



Thanks in advance for any help.



Priyank

 

 

 

 


 

--

Chandan Prakash


 

--

Chandan Prakash

 


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Silvio Fiorito

Any way you go you’ll increase something...

 

Even with foreachBatch you would have to cache the DataFrame before submitting each batch to each topic to avoid recomputing it (see https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch)

 

Nothing’s free! 😉

 

Since you’re just pushing all messages to kafka, might be easier on you to just explode the rows and let Spark do the rest for you.

 

From: kant kodali <[hidden email]>
Date: Tuesday, July 24, 2018 at 1:04 PM
To: Silvio Fiorito <[hidden email]>
Cc: Arun Mahadevan <[hidden email]>, chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

@Silvio Thought about duplicating rows but dropped the idea for increasing memory. forEachBatch sounds Interesting!

 

On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito <[hidden email]> wrote:

Using the current Kafka sink that supports routing based on topic column, you could just duplicate the rows (e.g. explode rows with different topic, key values). That way you’re only reading and processing the source once and not having to resort to custom sinks, foreachWriter, or multiple queries.

 

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame and let you write to the sink as you wish.

 

From: kant kodali <[hidden email]>
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan <[hidden email]>
Cc: chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

understand each row has a topic column but can we write one row to multiple topics?

 

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <[hidden email]> wrote:

What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

 

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

 

Thanks,

Arun

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Thanks a lot Arun for your response.

I got your point that existing sink plugins like kafka, etc can not be used.

However I could not get the part : " you cannot scale the partitions for the sinks independently "

Can you please rephrase the above part ?

 

Also,

I guess :

using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.

 

 

 

Regards,

Chandan

 

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:

Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

 

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.

 

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Hi,

Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.

This will mean that:

1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.

2.  every record will be read once in the single query but can be written to multiple sinks

 

Do you guys see any drawback in this approach ?

One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.

Apart from that any other issues do you see with this approach?

 

Regards,

Chandan

 

 

On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:

Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.

 

 

Or am i misunderstanding the problem?

 

TD

 

 

 

 

On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:

I had a similar issue and i think that’s where the structured streaming design lacks.

Seems like Question#2 in your email is a viable workaround for you.

 

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 

For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

 

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 

These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 

Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

 

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.

 

 

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.

 

I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.

 

Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 

 

2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?

 

Thanks in advance for any help.

 

Priyank

 

 

 

 


 

--

Chandan Prakash


 

--

Chandan Prakash

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

kant kodali

I am not seeing what else I will be increasing in this case if I were to write a set of rows to two different topics? which means for every row there will be two I/O's to two different topics

if forEachBatch caches great! but again I would try and cache as little as possible. If I duplicate rows using explode or something I will be caching 2X.




On Tue, Jul 24, 2018 at 11:52 AM, Silvio Fiorito <[hidden email]> wrote:

Any way you go you’ll increase something...

 

Even with foreachBatch you would have to cache the DataFrame before submitting each batch to each topic to avoid recomputing it (see https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch)

 

Nothing’s free! 😉

 

Since you’re just pushing all messages to kafka, might be easier on you to just explode the rows and let Spark do the rest for you.

 

From: kant kodali <[hidden email]>
Date: Tuesday, July 24, 2018 at 1:04 PM
To: Silvio Fiorito <[hidden email]>
Cc: Arun Mahadevan <[hidden email]>, chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

@Silvio Thought about duplicating rows but dropped the idea for increasing memory. forEachBatch sounds Interesting!

 

On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito <[hidden email]> wrote:

Using the current Kafka sink that supports routing based on topic column, you could just duplicate the rows (e.g. explode rows with different topic, key values). That way you’re only reading and processing the source once and not having to resort to custom sinks, foreachWriter, or multiple queries.

 

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame and let you write to the sink as you wish.

 

From: kant kodali <[hidden email]>
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan <[hidden email]>
Cc: chandan prakash <[hidden email]>, Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

understand each row has a topic column but can we write one row to multiple topics?

 

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <[hidden email]> wrote:

What I meant was the number of partitions cannot be varied with ForeachWriter v/s if you were to write to each sink using independent queries. Maybe this is obvious.

 

I am not sure about the difference you highlight about the performance part. The commit happens once per micro batch and "close(null)" is invoked. You can batch your writes in the process and/or in the close. The guess the writes can still be atomic and decided by if “close” returns successfully or throws an exception.

 

Thanks,

Arun

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer <[hidden email]>
Cc: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>


Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Thanks a lot Arun for your response.

I got your point that existing sink plugins like kafka, etc can not be used.

However I could not get the part : " you cannot scale the partitions for the sinks independently "

Can you please rephrase the above part ?

 

Also,

I guess :

using foreachwriter for multiple sinks will affect the performance because write will happen to a sink per record basis (after deciding a record belongs to which particular sink), where as in the current implementation all data under a RDD partition gets committed to the sink atomically in one go. Please correct me if I am wrong here.

 

 

 

Regards,

Chandan

 

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <[hidden email]> wrote:

Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

 

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.

 

 

From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

 

Hi,

Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.

This will mean that:

1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.

2.  every record will be read once in the single query but can be written to multiple sinks

 

Do you guys see any drawback in this approach ?

One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.

Apart from that any other issues do you see with this approach?

 

Regards,

Chandan

 

 

On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:

Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.

 

 

Or am i misunderstanding the problem?

 

TD

 

 

 

 

On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:

I had a similar issue and i think that’s where the structured streaming design lacks.

Seems like Question#2 in your email is a viable workaround for you.

 

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 

For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

 

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 

These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 

Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

 

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.

 

 

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.

 

I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.

 

Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 

 

2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?

 

Thanks in advance for any help.

 

Priyank

 

 

 

 


 

--

Chandan Prakash


 

--

Chandan Prakash

 

 


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Avoiding multiple streaming queries

Karthik Reddy Vadde
In reply to this post by Arun Mahadevan


On Thu, Jul 12, 2018 at 10:23 AM Arun Mahadevan <[hidden email]> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you need to write the custom logic yourself and you cannot scale the partitions for the sinks independently.


From: chandan prakash <[hidden email]>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can decided which record should go to which sink (based on some marker in record, which we can possibly annotate during transformation) and then accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <[hidden email]> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to figure out is how to generate the value of that column.


Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <[hidden email]> wrote:
I had a similar issue and i think that’s where the structured streaming design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream based on schema. 
For example, a Kafka topic can have three different types of schema messages and I would like to ingest into the three different column tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading the same topic and ingesting to respective column tables using their Sink implementations. 
These three streaming queries create underlying three IncrementalExecutions and three KafkaSources, and three queries reading the same data from the same Kafka topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the messages in a Kafka partition, unfortunately this is not in our control and customers cannot change it due to their dependencies on other subsystems.


On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <[hidden email]> wrote:

I have a structured streaming query which sinks to Kafka.  This query has a complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics each partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka sinks for each of the different Kafka topics because that would mean running multiple streaming queries - one for each Kafka topic, especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to multiple Kafka topics each with a different key column but without having to execute multiple streaming queries? 


2.  If not,  would it be efficient to cascade the multiple queries such that the first query does the complex aggregation and writes output to Kafka and then the other queries just read the output of the first query and write their topics to Kafka thus avoiding doing the complex aggregation again?


Thanks in advance for any help.


Priyank







--
Chandan Prakash