mapGroupsWithState in Python

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

mapGroupsWithState in Python

ayan guha
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp, attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the state of id as per the highest last_updated, across the triggers, for a certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution? Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates, but it is keeping the 1st record encountered for an Id, not updating the state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id').alias('id'), 
                          get_json_object(unpackedDF.jsonData, '$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours')
           

So it is not working. Any help is appreciated. 

--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: mapGroupsWithState in Python

ayan guha
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <[hidden email]> wrote:
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp, attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the state of id as per the highest last_updated, across the triggers, for a certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution? Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates, but it is keeping the 1st record encountered for an Id, not updating the state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id').alias('id'), 
                          get_json_object(unpackedDF.jsonData, '$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours')
           

So it is not working. Any help is appreciated. 

--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: mapGroupsWithState in Python

Tathagata Das
Hello Ayan,

From what I understand, mapGroupsWithState (probably the more general flatMapGroupsWithState) is the best way forward (not available in python). However, you need to figure out your desired semantics of when you want to output the deduplicated data from the stremaing query. For example, if there is the following sequence of events 

(id, last_update_timestamp, attribute)
1, 12:00, A      <---- do you want to output this immediately or wait for sometime to see if there are new data?
1, 11:59, B      <---- ignored as duplicate
1, 12:01, C     <---- do you want to output this?
1, 12:02, D

If you want to output something every time there is a newer last_update_timestamp, then thats not really a strict "deduplication". Its more like aggregation with keeping the latest. In that case, you can try using UDAFs as well. However, with UDAFs you wont get any state cleanup. So the flatMapGroupsWithState is the best solution as you can do whatever tracking you want, output whenever you want, and get state cleanup using timeouts.

FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk - https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming







On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <[hidden email]> wrote:
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <[hidden email]> wrote:
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp, attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the state of id as per the highest last_updated, across the triggers, for a certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution? Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates, but it is keeping the 1st record encountered for an Id, not updating the state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id').alias('id'), 
                          get_json_object(unpackedDF.jsonData, '$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours')
           

So it is not working. Any help is appreciated. 

--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha

Reply | Threaded
Open this post in threaded view
|

Re: mapGroupsWithState in Python

ayan guha
Thanks a lot TD, exactly what I was looking for. And I have seen most of your talks, really great stuff you guys are doing :)

On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das <[hidden email]> wrote:
Hello Ayan,

From what I understand, mapGroupsWithState (probably the more general flatMapGroupsWithState) is the best way forward (not available in python). However, you need to figure out your desired semantics of when you want to output the deduplicated data from the stremaing query. For example, if there is the following sequence of events 

(id, last_update_timestamp, attribute)
1, 12:00, A      <---- do you want to output this immediately or wait for sometime to see if there are new data?
1, 11:59, B      <---- ignored as duplicate
1, 12:01, C     <---- do you want to output this?
1, 12:02, D

If you want to output something every time there is a newer last_update_timestamp, then thats not really a strict "deduplication". Its more like aggregation with keeping the latest. In that case, you can try using UDAFs as well. However, with UDAFs you wont get any state cleanup. So the flatMapGroupsWithState is the best solution as you can do whatever tracking you want, output whenever you want, and get state cleanup using timeouts.

FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk - https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming







On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <[hidden email]> wrote:
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <[hidden email]> wrote:
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp, attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the state of id as per the highest last_updated, across the triggers, for a certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution? Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates, but it is keeping the 1st record encountered for an Id, not updating the state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id').alias('id'), 
                          get_json_object(unpackedDF.jsonData, '$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours')
           

So it is not working. Any help is appreciated. 

--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha




--
Best Regards,
Ayan Guha