Custom aggregations: modular and lightweight solutions?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom aggregations: modular and lightweight solutions?

Andrew Leverentz
Hi All,

I'm attempting to clean up some Spark code which performs groupByKey / mapGroups to compute custom aggregations, and I could use some help understanding the Spark API's necessary to make my code more modular and maintainable.

In particular, my current approach is as follows:
  • Start with a Dataset[CaseClass1]
  • Apply groupByKey(f), where f is a function that extracts a tuple of keys
  • Apply mapGroups(g), where g computes multiple custom aggregations:
    • Iterate through the rows in each group, updating a large, mutable CustomState object.
    • At the end of each group, transform the current key and the CustomState into an instance of CaseClass2.
In other words, we start with a dataset of CaseClass1 objects and end up with a dataset of CaseClass2 objects, using instances of a complex CustomState class to store the intermediate state during the aggregation.

We have dozens of custom aggregation calculations to perform on this data, and I'd like to be able streamline the process of introducing new aggregations and comparing multiple parameterized variations of the same aggregations side-by-side.  The current approach requires us to touch several tightly coupled pieces of code in order to add simple variations to existing aggregate functions.

The UDAF API seems to be designed for this use case, but I've found it to be just as cumbersome to create new UDAF's as it is to maintain my current code.

To address this, I've tried a couple of approaches (described below), although I've run into problems with both of them.

At a high level, both of my approaches require a Dataset[T], a key extractor function (T => K), and a collection of instances of a custom class GroupingCalculation[T, S, R].  Here, T is the data type of each row in the dataset, K is the type of the key by which the rows should be grouped, S is the type of the intermediate state during aggregation, and R is the result type of each aggregation.  In this context, the data types T and K are fixed, but the state and result types (S and R) may vary among the GroupingCalculation instances.  The resulting DataFrame will have Rows which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ..., Rn are the result types for the GroupingCollection instances.

(1) My first approach operates by constructing a UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to T, S, and R.  After some digging and experimentation, I found a way to use CatalystTypeConverters and ExpressionEncoders to populate the MutableAggregationBuffer.  Unfortunately, once I finally got it running, this approach yielded a runtime 10x slower than the original approach described above. I suspect that adding an extra encoding/decoding layer on top of the UDAF was what slowed it down.  Because of this, I'm setting aside this approach for now.

(2) Using a similar API to (1), I replaced the implementation with one that uses groupByKey and mapGroups.  This bypasses the need for creating a wrapper around UDAF.  Also, the internal state, rather than being encoded in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per group.  An implementation of this approach is available here: https://gist.github.com/alev000/27d10a402ad250957b792091084932f4
I feel that this implementation is promising, but I haven't been able to get some of my test cases in the above Gist to pass.  In particular, my test cases "Test grouping calculations with various combinations of case classes" and "Test firstAndOnly" fail with the following runtime error messages, respectively:
  • "TestCase3 is not a valid external type for schema of struct<a:int,b:double>"
  • "scala.Some is not a valid external type for schema of string"
Would anyone be able to help me diagnose the runtime errors with approach (2), or to suggest a better alternative?

Thanks,
~ Andrew
Reply | Threaded
Open this post in threaded view
|

Re: Custom aggregations: modular and lightweight solutions?

Andrew Leverentz
Here's a simpler example that I think gets at the heart of what I'm trying to do: DynamicSchemaExample.scala.  Here, I'm dynamically creating a sequence of Rows and also dynamically creating a corresponding schema (StructType), but the RowEncoder derived from the schema doesn't seem to handle the nested structure of the Rows.  This example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is not a valid external type for schema of struct<_1:double,_2:double>").

If I could find a way to get this example working (for arbitrary values of rowSize), I suspect that it would also give me a solution to the custom-aggregation issue I outlined in my previous email.  Any suggestions would be much appreciated.

Thanks,
~ Andrew



On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz <[hidden email]> wrote:
Hi All,

I'm attempting to clean up some Spark code which performs groupByKey / mapGroups to compute custom aggregations, and I could use some help understanding the Spark API's necessary to make my code more modular and maintainable.

In particular, my current approach is as follows:
  • Start with a Dataset[CaseClass1]
  • Apply groupByKey(f), where f is a function that extracts a tuple of keys
  • Apply mapGroups(g), where g computes multiple custom aggregations:
    • Iterate through the rows in each group, updating a large, mutable CustomState object.
    • At the end of each group, transform the current key and the CustomState into an instance of CaseClass2.
In other words, we start with a dataset of CaseClass1 objects and end up with a dataset of CaseClass2 objects, using instances of a complex CustomState class to store the intermediate state during the aggregation.

We have dozens of custom aggregation calculations to perform on this data, and I'd like to be able streamline the process of introducing new aggregations and comparing multiple parameterized variations of the same aggregations side-by-side.  The current approach requires us to touch several tightly coupled pieces of code in order to add simple variations to existing aggregate functions.

The UDAF API seems to be designed for this use case, but I've found it to be just as cumbersome to create new UDAF's as it is to maintain my current code.

To address this, I've tried a couple of approaches (described below), although I've run into problems with both of them.

At a high level, both of my approaches require a Dataset[T], a key extractor function (T => K), and a collection of instances of a custom class GroupingCalculation[T, S, R].  Here, T is the data type of each row in the dataset, K is the type of the key by which the rows should be grouped, S is the type of the intermediate state during aggregation, and R is the result type of each aggregation.  In this context, the data types T and K are fixed, but the state and result types (S and R) may vary among the GroupingCalculation instances.  The resulting DataFrame will have Rows which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ..., Rn are the result types for the GroupingCollection instances.

(1) My first approach operates by constructing a UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to T, S, and R.  After some digging and experimentation, I found a way to use CatalystTypeConverters and ExpressionEncoders to populate the MutableAggregationBuffer.  Unfortunately, once I finally got it running, this approach yielded a runtime 10x slower than the original approach described above. I suspect that adding an extra encoding/decoding layer on top of the UDAF was what slowed it down.  Because of this, I'm setting aside this approach for now.

(2) Using a similar API to (1), I replaced the implementation with one that uses groupByKey and mapGroups.  This bypasses the need for creating a wrapper around UDAF.  Also, the internal state, rather than being encoded in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per group.  An implementation of this approach is available here: https://gist.github.com/alev000/27d10a402ad250957b792091084932f4
I feel that this implementation is promising, but I haven't been able to get some of my test cases in the above Gist to pass.  In particular, my test cases "Test grouping calculations with various combinations of case classes" and "Test firstAndOnly" fail with the following runtime error messages, respectively:
  • "TestCase3 is not a valid external type for schema of struct<a:int,b:double>"
  • "scala.Some is not a valid external type for schema of string"
Would anyone be able to help me diagnose the runtime errors with approach (2), or to suggest a better alternative?

Thanks,
~ Andrew