Best way to emit custom metrics to Prometheus in spark structured streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Best way to emit custom metrics to Prometheus in spark structured streaming

meetwes
Hi I am looking for the right approach to emit custom metrics for spark structured streaming job. Actual Scenario:
I have an aggregated dataframe let's say with (id, key, value) columns. One of the kpis could be 'droppedRecords' and the corresponding value column has the number of dropped records. I need to filter all the KPIs with 'droppedRecords' and compute the sum on it's value column.

Challenges:
1) Need to use only one streaming query so the metrics will be accurate (1 readStream and 1 writeStream). If the metrics are emitted in a separate query, then it can cause inconsistencies due to varying watermark time between the query that does the aggregation and the one that gets only the metrics.

I evaluated some of the approaches:
1) foreachBatch sink: This works for emitting metrics but there are other bugs.. Eg: The numOutputRows emitted in logs is always -1.

2) Using accumulators:
val dropCounts: LongAccumulator = new LongAccumulator
spark.sparkContext.register(dropCounts, "Drop Counts Accumulator")
df.as[].map(row => {
val value = row.value
dropCounts.add(value.toLong)
})
This approach seems to have a bug in spark. The executor does add the value correctly but the driver's count is always 0.

3) Using mapGroupsWithState. This requires an action on the aggregated dataframe to retrieve metrics, therefore creates another streaming query.

I am using spark 3.0.1. What's would be the best way to implement custom metrics?

Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Best way to emit custom metrics to Prometheus in spark structured streaming

Jungtaek Lim-2
You can try out "Dataset.observe" added in Spark 3, which enables arbitrary metrics to be logged and exposed to streaming query listeners.

On Tue, Nov 3, 2020 at 3:25 AM meetwes <[hidden email]> wrote:
Hi I am looking for the right approach to emit custom metrics for spark structured streaming job. Actual Scenario:
I have an aggregated dataframe let's say with (id, key, value) columns. One of the kpis could be 'droppedRecords' and the corresponding value column has the number of dropped records. I need to filter all the KPIs with 'droppedRecords' and compute the sum on it's value column.

Challenges:
1) Need to use only one streaming query so the metrics will be accurate (1 readStream and 1 writeStream). If the metrics are emitted in a separate query, then it can cause inconsistencies due to varying watermark time between the query that does the aggregation and the one that gets only the metrics.

I evaluated some of the approaches:
1) foreachBatch sink: This works for emitting metrics but there are other bugs.. Eg: The numOutputRows emitted in logs is always -1.

2) Using accumulators:
val dropCounts: LongAccumulator = new LongAccumulator
spark.sparkContext.register(dropCounts, "Drop Counts Accumulator")
df.as[].map(row => {
val value = row.value
dropCounts.add(value.toLong)
})
This approach seems to have a bug in spark. The executor does add the value correctly but the driver's count is always 0.

3) Using mapGroupsWithState. This requires an action on the aggregated dataframe to retrieve metrics, therefore creates another streaming query.

I am using spark 3.0.1. What's would be the best way to implement custom metrics?

Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Best way to emit custom metrics to Prometheus in spark structured streaming

meetwes
Hi, Thanks for the reply. I tried it out today but I am unable to get it to
work in cluster mode. The aggregation result is always 0. It works fine in
standalone however with spark shell but with spark on Kubernetes in cluster
mode, it doesn't.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Best way to emit custom metrics to Prometheus in spark structured streaming

meetwes
In reply to this post by Jungtaek Lim-2
So I tried it again in standalone mode (spark-shell) and the df.observe()
functionality works. I tried sum, count, conditional aggregations using
'when', etc and all of this works in spark-shell. But, with spark-on-k8s,
cluster mode, only using lit() as the aggregation column works. No other
aggregation, including, count, sum, etc work.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]