spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meetwes <>
Subject Best way to emit custom metrics to Prometheus in spark structured streaming
Date Mon, 02 Nov 2020 18:12:44 GMT
Hi I am looking for the right approach to emit custom metrics for spark
structured streaming job.*Actual Scenario:*
I have an aggregated dataframe let's say with (id, key, value) columns. One
of the kpis could be 'droppedRecords' and the corresponding value column has
the number of dropped records. I need to filter all the KPIs with
'droppedRecords' and compute the sum on it's value column.

1) Need to use only one streaming query so the metrics will be accurate (1
readStream and 1 writeStream). If the metrics are emitted in a separate
query, then it can cause inconsistencies due to varying watermark time
between the query that does the aggregation and the one that gets only the

*I evaluated some of the approaches:*
1) _foreachBatch sink:_ This works for emitting metrics but there are other
bugs.. Eg: The numOutputRows emitted in logs is always -1.

2) _Using accumulators:_
val dropCounts: LongAccumulator = new LongAccumulator
spark.sparkContext.register(dropCounts, "Drop Counts Accumulator")[].map(row => {
    val value = row.value
This approach seems to have a bug in spark. The executor does add the value
correctly but the driver's count is always 0.

3) _Using mapGroupsWithState._ This requires an action on the aggregated
dataframe to retrieve metrics, therefore creates another streaming query.

I am using spark 3.0.1. What's would be the best way to implement custom

Sent from:
View raw message