Hi,
In my problem, I need to group the DataFrame, apply the business logic for each group and finally emit a new DataFrame based on that. To describe in detail, there is a device_dataframe which contains the timestamp of when the device had been turned on (on) and turned off (off).
+---------+----- +--------------------+
|device_id|state | d_ts |
+---------+----- +--------------------+
|1 |off |2020-09-10 16:14:58 |
|1 |on |2020-09-19 16:14:58 |
|2 |on |2020-09-20 16:14:58 |
|2 |off |2020-10-03 16:14:58 |
|4 |on |2020-09-20 16:14:58 |
|5 |off |2020-09-20 16:14:58 |
+---------+-----+-------+-------------+
On the other hand, there is a DataFrame containing events information including its timestamp and its corresponding device.
+-----+---------+--------------------+
|e_id |device_id| e_ts |
+-----+---------+--------------------+
|1 |1 |2020-09-20 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |
|3 |4 |2020-11-01 12:15:37 |
|4 |5 |2020-10-08 01:35:08 |
+-----+---------+-------+------------+
The following is a join example of two DataFrames:
+---------+-----+--------------------+------+--------------------+
|device_id|e_id | e_ts |state | d_ts |
+---------+-----+--------------------+------+--------------------+
|1 |1 |2020-09-20 16:14:58 |off |2020-09-10 16:14:58 |
|1 |1 |2020-09-20 16:14:58 |on |2020-09-19 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |on |2020-09-20 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |off |2020-10-03 16:14:58 |
|4 |3 |2020-11-01 12:15:37 |on |2020-09-20 16:14:58 |
|5 |4 |2020-10-08 01:35:08 |off |2020-09-20 16:14:58 |
+---------+-----+-------+--------------------+------+------------+
What I finally need to find is the events information that happened while its corresponding device was on. For example in the case of the above table, the event_id 1 is valid because it happened on 2020-09-20 16:14:58 and its device has been on since 2020-09-19 16:14:58, and the event_id 2 is not valid as its device was turned down on 2020-10-03 16:14:58 and never turned on again, and so on which results in the following table:
+---------+-----+-------------------+
|device_id|e_id | e_ts |
+---------+-----+-------------------+
|1 |1 |2020-09-20 16:14:58|
|4 |3 |2020-11-01 12:15:37|
+---------+-----+-------------------+
I did the below to group the join table base on the devices:
val grouped = eventDF
.join(deviceDF, "device_id")
.groupBy("device_id")
which results in RelationalGroupedDataSet. Now I need to apply the logic to each group and emit the result DataFrame but I didn't find a solution for that. I checked the UDAFs but I found it not working in my case.I know how to solve this using RDD API, but I want to find its
Column API approach. Any help or suggestion will be appreciated.
Thanks