Dynamic Spark metrics creation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic Spark metrics creation

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
Hi all, 
I have a spark application with Arbitrary Stateful Aggregation implemented with FlatMapGroupsWithStateFunction.

I want to make some statistics about incoming events inside FlatMapGroupsWithStateFunction.
The statistics are made from some event property which on the one hand has dynamic values but on the other hand - small finite set (thought unknown) of values (e.g. country name).

So I thought to register dynamic metrics inside  FlatMapGroupsWithStateFunction but as far as I understand, this requires accessing MetricsSystem via SparkEnv.get() which is unavailable from executors.

Any thoughts/suggestions? 

With best regards,
Yurii

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Spark metrics creation

Jacek Laskowski
Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the values (on the driver)?

‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <[hidden email]> wrote:‬
Hi all, 
I have a spark application with Arbitrary Stateful Aggregation implemented with FlatMapGroupsWithStateFunction.

I want to make some statistics about incoming events inside FlatMapGroupsWithStateFunction.
The statistics are made from some event property which on the one hand has dynamic values but on the other hand - small finite set (thought unknown) of values (e.g. country name).

So I thought to register dynamic metrics inside  FlatMapGroupsWithStateFunction but as far as I understand, this requires accessing MetricsSystem via SparkEnv.get() which is unavailable from executors.

Any thoughts/suggestions? 

With best regards,
Yurii

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Spark metrics creation

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
Hey Jacek, I’ll clarify myself a bit:
As bottom line I need following metrics being reported by structured streaming:
Country-USA:7
Country-Poland: 23
Country-Brazil: 56

The country names are included in incoming events and unknown at very beginning/application startup.

Thus registering accumulator and binding it to metric source at driver side on application startup is impossible (unless you register with all possible country names - which is waste of Spark memory, polluting metrics namespace with 99% of metrics having zero value, and wasting the network bandwidth ).


Отправлено с iPhone

17 янв. 2021 г., в 15:56, Jacek Laskowski <[hidden email]> написал(а):


Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the values (on the driver)?

‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <[hidden email]> wrote:‬
Hi all, 
I have a spark application with Arbitrary Stateful Aggregation implemented with FlatMapGroupsWithStateFunction.

I want to make some statistics about incoming events inside FlatMapGroupsWithStateFunction.
The statistics are made from some event property which on the one hand has dynamic values but on the other hand - small finite set (thought unknown) of values (e.g. country name).

So I thought to register dynamic metrics inside  FlatMapGroupsWithStateFunction but as far as I understand, this requires accessing MetricsSystem via SparkEnv.get() which is unavailable from executors.

Any thoughts/suggestions? 

With best regards,
Yurii

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Spark metrics creation

Ivan Petrov
Would custom accumulator work for you? It should be do-able for Map[String,Long] too
https://stackoverflow.com/questions/42293798/how-to-create-custom-set-accumulator-i-e-setstring


‪вс, 17 янв. 2021 г. в 15:16, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <[hidden email]>:‬
Hey Jacek, I’ll clarify myself a bit:
As bottom line I need following metrics being reported by structured streaming:
Country-USA:7
Country-Poland: 23
Country-Brazil: 56

The country names are included in incoming events and unknown at very beginning/application startup.

Thus registering accumulator and binding it to metric source at driver side on application startup is impossible (unless you register with all possible country names - which is waste of Spark memory, polluting metrics namespace with 99% of metrics having zero value, and wasting the network bandwidth ).


Отправлено с iPhone

17 янв. 2021 г., в 15:56, Jacek Laskowski <[hidden email]> написал(а):


Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the values (on the driver)?

‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <[hidden email]> wrote:‬
Hi all, 
I have a spark application with Arbitrary Stateful Aggregation implemented with FlatMapGroupsWithStateFunction.

I want to make some statistics about incoming events inside FlatMapGroupsWithStateFunction.
The statistics are made from some event property which on the one hand has dynamic values but on the other hand - small finite set (thought unknown) of values (e.g. country name).

So I thought to register dynamic metrics inside  FlatMapGroupsWithStateFunction but as far as I understand, this requires accessing MetricsSystem via SparkEnv.get() which is unavailable from executors.

Any thoughts/suggestions? 

With best regards,
Yurii