Converting RelationalGroupedDataSet to DataFrame

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Converting RelationalGroupedDataSet to DataFrame

Soheil Pourbafrani
Hi,

In my problem, I need to group the DataFrame, apply the business logic for each group and finally emit a new DataFrame based on that. To describe in detail, there is a device_dataframe which contains the timestamp of when the device had been turned on (on) and turned off (off).
+---------+----- +--------------------+
|device_id|state |   d_ts             |
+---------+----- +--------------------+
|1        |off   |2020-09-10 16:14:58 |
|1        |on    |2020-09-19 16:14:58 |
|2        |on    |2020-09-20 16:14:58 |
|2        |off   |2020-10-03 16:14:58 |
|4        |on    |2020-09-20 16:14:58 |
|5        |off   |2020-09-20 16:14:58 |
+---------+-----+-------+-------------+
On the other hand, there is a DataFrame containing events information including its timestamp and its corresponding device.
+-----+---------+--------------------+
|e_id |device_id|       e_ts         |
+-----+---------+--------------------+
|1    |1        |2020-09-20 16:14:58 |
|2    |2        |2020-10-08 09:19:55 |
|3    |4        |2020-11-01 12:15:37 |
|4    |5        |2020-10-08 01:35:08 |
+-----+---------+-------+------------+
The following is a join example of two DataFrames:
+---------+-----+--------------------+------+--------------------+
|device_id|e_id |       e_ts         |state |   d_ts             |
+---------+-----+--------------------+------+--------------------+
|1        |1    |2020-09-20 16:14:58 |off   |2020-09-10 16:14:58 |
|1        |1    |2020-09-20 16:14:58 |on    |2020-09-19 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |on    |2020-09-20 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |off   |2020-10-03 16:14:58 |
|4        |3    |2020-11-01 12:15:37 |on    |2020-09-20 16:14:58 |
|5        |4    |2020-10-08 01:35:08 |off   |2020-09-20 16:14:58 |
+---------+-----+-------+--------------------+------+------------+
What I finally need to find is the events information that happened while its corresponding device was on. For example in the case of the above table, the event_id 1 is valid because it happened on 2020-09-20 16:14:58 and its device has been on since 2020-09-19 16:14:58, and the event_id 2 is not valid as its device was turned down on 2020-10-03 16:14:58 and never turned on again, and so on which results in the following table:
+---------+-----+-------------------+
|device_id|e_id |       e_ts        |
+---------+-----+-------------------+
|1        |1    |2020-09-20 16:14:58|
|4        |3    |2020-11-01 12:15:37|
+---------+-----+-------------------+
I did the below to group the join table base on the devices:
val grouped = eventDF
      .join(deviceDF, "device_id")
      .groupBy("device_id")
which results in RelationalGroupedDataSet. Now I need to apply the logic to each group and emit the result DataFrame but I didn't find a solution for that. I checked the UDAFs but I found it not working in my case.I know how to solve this using RDD API, but I want to find its Column API approach. Any help or suggestion will be appreciated.
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Converting RelationalGroupedDataSet to DataFrame

Stéphane Verlet-2
Once you have RelationalGroupedDataSet , you can use agg() to perform group wide operation such max , sum , etc ... or even custom aggregator.

df.groupBy(....).agg(sum(col(...))) 

That will return DF with your groupBy columns and result of the aggregation 


Stephane




Soheil Pourbafrani wrote:


Hi,

In my problem, I need to group the DataFrame, apply the business logic for each group and finally emit a new DataFrame based on that. To describe in detail, there is a device_dataframe which contains the timestamp of when the device had been turned on (on) and turned off (off).
+---------+----- 
+--------------------+
|device_id|state |   d_ts             |
+---------+----- +--------------------+
|1        
|off   |2020-09-10 
16:14:58 
|
|1        
|on    |2020-09-19 
16:14:58 
|
|2        
|on    |2020-09-20 
16:14:58 
|
|2        
|off   |2020-10-03 
16:14:58 
|
|4        
|on    |2020-09-20 
16:14:58 
|
|5        
|off   |2020-09-20 
16:14:58 
|
+---------+-----+-------+-------------+

On the other hand, there is a DataFrame containing events information including its timestamp and its corresponding device.
+-----+---------+--------------------+

|e_id |device_id|       e_ts         |
+-----+---------+--------------------+
|1    
|1        
|2020-09-20 
16:14:58 
|
|2    
|2        
|2020-10-08 
09:19:55 
|
|3    
|4        
|2020-11-01 
12:15:37 
|
|4    
|5        
|2020-10-08 
01:35:08 
|
+-----+---------+-------+------------+

The following is a join example of two DataFrames:
+---------+-----+--------------------+------+--------------------+

|device_id|e_id |       e_ts         |state |   d_ts             |
+---------+-----+--------------------+------+--------------------+
|1        
|1    
|2020-09-20 
16:14:58 
|off   |2020-09-10 
16:14:58 
|
|1        
|1    
|2020-09-20 
16:14:58 
|on    |2020-09-19 
16:14:58 
|
|2        
|2    
|2020-10-08 
09:19:55 
|on    |2020-09-20 
16:14:58 
|
|2        
|2    
|2020-10-08 
09:19:55 
|off   |2020-10-03 
16:14:58 
|
|4        
|3    
|2020-11-01 
12:15:37 
|on    |2020-09-20 
16:14:58 
|
|5        
|4    
|2020-10-08 
01:35:08 
|off   |2020-09-20 
16:14:58 
|
+---------+-----+-------+--------------------+------+------------+

What I finally need to find is the events information that happened while its corresponding device was on. For example in the case of the above table, the event_id 1 is valid because it happened on 2020-09-20 16:14:58 and its device has been on since 2020-09-19 16:14:58, and the event_id 2 is not valid as its device was turned down on 2020-10-03 16:14:58 and never turned on again, and so on which results in the following table:
+---------+-----+-------------------+

|device_id|e_id |       e_ts        |
+---------+-----+-------------------+
|1        
|1    
|2020-09-20 
16:14:58|

|4        
|3    
|2020-11-01 
12:15:37|

+---------+-----+-------------------+

I did the below to group the join table base on the devices:
val 
grouped = eventDF
       .join(deviceDF, "device_id")
       
.groupBy("device_id")

which results in RelationalGroupedDataSet. Now I need to apply the logic to each group and emit the result DataFrame but I didn't find a solution for that. I checked the UDAFs but I found it not working in my case.I know how to solve this using RDD API, but I want to find its Column API approach. Any help or suggestion will be appreciated.
Thanks