Stream-static join : Refreshing subset of static data / Connection pooling

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream-static join : Refreshing subset of static data / Connection pooling

Arti Pande
Hi

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?
 
select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande 
Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

German Schiavon Matteo
Hi!

I guess you could use foreachBatch and do something like 

foreachBatch {
ids = getIds
spark.read.jdbc(query where id is in $ids)
join
write
}

The only thing is that in order to get he id's you would have to do collect no? Or how are you retrieving them right now?


On Thu, 26 Nov 2020 at 14:38, Geervan Hayatnagarkar <[hidden email]> wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?
 
select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande 
Reply | Threaded
Open this post in threaded view
|

Re:Stream-static join : Refreshing subset of static data / Connection pooling

fanxin
In reply to this post by Arti Pande
Hi,
If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame


On 11/26/2020 21:44[hidden email] wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?
 
select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande 
--------------------------------------------------------------------- To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

Arti Pande
Yes we used collect to get all IDs in forEachBatch

No, the static table is huge and is updated frequently by other systems

On Thu, Nov 26, 2020, 9:01 PM fanxin <[hidden email]> wrote:
Hi,
If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame


On 11/26/2020 21:44[hidden email] wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?
 
select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande 
Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

German Schiavon Matteo
So that's it no? you can push down the in filter in the query with the id's and only retrieve those rows.

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar <[hidden email]> wrote:
Yes we used collect to get all IDs in forEachBatch

No, the static table is huge and is updated frequently by other systems

On Thu, Nov 26, 2020, 9:01 PM fanxin <[hidden email]> wrote:
Hi,
If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame


On 11/26/2020 21:44[hidden email] wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?
 
select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande 
Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

Kevin Pis

Hi,

 

you can use Debezium to capture real-timely the row-level changes in PostgreSql, then stream them to kafka, finally etl and write  the data to hbase by flink/spark streamingSo you can join the data in hbase directly. in consideration of the particularly big table,  the scan performance  in hbase is much better than PostgreSql.

 

 

From: German Schiavon <[hidden email]>
Date: Friday, November 27, 2020 at 12:09 AM
To: Geervan Hayatnagarkar <[hidden email]>
Cc: fanxin <[hidden email]>, User <[hidden email]>
Subject: Re: Stream-static join : Refreshing subset of static data / Connection pooling

 

So that's it no? you can push down the in filter in the query with the id's and only retrieve those rows.

 

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar <[hidden email]> wrote:

Yes we used collect to get all IDs in forEachBatch

 

No, the static table is huge and is updated frequently by other systems

 

On Thu, Nov 26, 2020, 9:01 PM fanxin <[hidden email]> wrote:

Hi,

If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame

 

 

On 11/26/2020 21:44[hidden email] wrote

Hi

 

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?

 

select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

 

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

 

Thanks & regards,

Arti Pande 

Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

Arti Pande
The real question is two fold: 

1) we had to do collect on each microbatch. In high velocity streams this could result in millions of records causing memory issue. Also it appears that we are manually doing the real join by selecting  only matching rows from static source. Is there a better way to do this?

2) can we avoid making JDBC connection per microbatch? Can we pool it?

On Sun, Nov 29, 2020, 2:22 PM chen kevin <[hidden email]> wrote:

Hi,

 

you can use Debezium to capture real-timely the row-level changes in PostgreSql, then stream them to kafka, finally etl and write  the data to hbase by flink/spark streamingSo you can join the data in hbase directly. in consideration of the particularly big table,  the scan performance  in hbase is much better than PostgreSql.

 

 

From: German Schiavon <[hidden email]>
Date: Friday, November 27, 2020 at 12:09 AM
To: Geervan Hayatnagarkar <[hidden email]>
Cc: fanxin <[hidden email]>, User <[hidden email]>
Subject: Re: Stream-static join : Refreshing subset of static data / Connection pooling

 

So that's it no? you can push down the in filter in the query with the id's and only retrieve those rows.

 

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar <[hidden email]> wrote:

Yes we used collect to get all IDs in forEachBatch

 

No, the static table is huge and is updated frequently by other systems

 

On Thu, Nov 26, 2020, 9:01 PM fanxin <[hidden email]> wrote:

Hi,

If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame

 

 

On 11/26/2020 21:44[hidden email] wrote

Hi

 

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?

 

select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

 

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

 

Thanks & regards,

Arti Pande 

Reply | Threaded
Open this post in threaded view
|

Re: Stream-static join : Refreshing subset of static data / Connection pooling

Kevin Pis

 

  1. I think it should not cause memory issue, , if you configurate kafka, spark/flink and hbase.
    1. We use the method in our scenario, the data will reach aoubt  80~150Tb a day.  Does it generate more data in your scenario ?  I think it’s the best method to deal with the particularly big  table that will be joined.
    2. I think frequent I/O actions like select  may cause memery or i/o issues.  
  2. You can use postgreSql connection pools to avoid making connnection frequently.

 

 

-- 

Best,

Kevin Chen

 

 

From: Geervan Hayatnagarkar <[hidden email]>
Date: Sunday, November 29, 2020 at 6:20 PM
To: chen kevin <[hidden email]>
Cc: German Schiavon <[hidden email]>, fanxin <[hidden email]>, User <[hidden email]>
Subject: Re: Stream-static join : Refreshing subset of static data / Connection pooling

 

The real question is two fold: 

 

1) we had to do collect on each microbatch. In high velocity streams this could result in millions of records causing memory issue. Also it appears that we are manually doing the real join by selecting  only matching rows from static source. Is there a better way to do this?

 

2) can we avoid making JDBC connection per microbatch? Can we pool it?

 

On Sun, Nov 29, 2020, 2:22 PM chen kevin <[hidden email]> wrote:

Hi,

 

you can use Debezium to capture real-timely the row-level changes in PostgreSql, then stream them to kafka, finally etl and write  the data to hbase by flink/spark streamingSo you can join the data in hbase directly. in consideration of the particularly big table,  the scan performance  in hbase is much better than PostgreSql.

 

 

From: German Schiavon <[hidden email]>
Date: Friday, November 27, 2020 at 12:09 AM
To: Geervan Hayatnagarkar <[hidden email]>
Cc: fanxin <[hidden email]>, User <[hidden email]>
Subject: Re: Stream-static join : Refreshing subset of static data / Connection pooling

 

So that's it no? you can push down the in filter in the query with the id's and only retrieve those rows.

 

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar <[hidden email]> wrote:

Yes we used collect to get all IDs in forEachBatch

 

No, the static table is huge and is updated frequently by other systems

 

On Thu, Nov 26, 2020, 9:01 PM fanxin <[hidden email]> wrote:

Hi,

If the static table is not particularly big and the modify frequency is low, you can load the whole table as a DataFrame and persist it in the memory. You may also need to repartition the DataFrame

 

 

On 11/26/2020 21:44[hidden email] wrote

Hi

 

We intend to do a stream-static join where kafka is a streaming source and RDBMS is a static source. 

 

e.g. User activity data is coming in as a stream from Kafka source and we need to pull User personal details from PostgreSQL. 

 

Because PostgreSQL is a static source, the entire "User-Personal-Details" table is being reloaded into spark memory for every microbatch. 

 

Is there a way to optimise this? For example we should be able to pull user-ids from every microbatch and then make a query as below ?

 

select * from user-personal-details where user-id in (<list-of-user-ids-from-current-microbatch>)

 

While we could not find a clean way to do this, we chose to make a JDBC connection for every microbatch and achieved the above optimisation. But that is still suboptimal solution because JDBC connection is being created for every micro-batch. Is there a way to pool JDBC connection in Structured streaming?

 

Thanks & regards,

Arti Pande