spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Iain Cundy <>
Subject mapWithState not compacting removed state
Date Thu, 03 Mar 2016 14:45:20 GMT
Hi All

I'm aggregating data using mapWithState with a timeout set in 1.6.0. It broadly works well
and by providing access to the key and the time in the callback allows a much more elegant
solution for time based aggregation than the old updateStateByKey function.

However there seems to be a problem - the size of the state and the time taken to iterate
over it for each micro-batch keeps increasing over time, long after the number of 'current'
keys settles down. We start removing keys after just over an hour, but the size of the state
keeps increasing in runs of over 6 hours.

Essentially we start by just adding keys for our input tuples, reaching a peak of about 7
million keys. Then we start to output data and remove keys - the number of keys drops to about
5 million. We continue processing tuples, which adds keys, while removing the keys we no longer
need - the number of keys fluctuates up and down between 5 million and  8 million.

We know this, and are reasonably confident our removal of keys is correct, because we obtain
the state with JavaMapWithStateDStream.stateSnapshots and count the keys.

>From my reading (I don't know scala!) of the code in org.apache.spark.streaming.util.StateMap.scala
it seems clear that the removed keys are only marked as deleted and are really destroyed subsequently
by compaction, based upon the length of the chain of delta maps. We'd expect the size of the
state RDDs and the time taken to iterate over all the state to stabilize once compaction is
run after we remove keys, but it just doesn't happen.

Is there some possible reason why compaction never gets run?

I tried to use the (undocumented?) config setting spark.streaming.sessionByKey.deltaChainThreshold
to try to control how often compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2

I can see it in the Spark application UI Environment page, but it doesn't seem to make any

I have noticed that the timeout mechanism only gets invoked on every 10th micro-batch. I'm
almost sure it isn't a coincidence that the checkpoint interval is also 10 micro-batches.
I assume that is an intentional performance optimization. However because I have a lot of
keys, I have a large micro-batch duration, so it would make sense for me to reduce that factor
of 10. However, since I don't call checkpoint on the state stream I can't see how to change

Can I change the checkpoint interval  somewhere? [I tried calling JavaMapWithStateDStream.checkpoint
myself, but that evidently isn't the same thing!]

My initial assumption was that there is a new deltaMap for each micro-batch, but having noticed
the timeout behavior I wonder if there is only a new deltaMap for each checkpoint? Or maybe
there are other criteria?

Perhaps compaction just hasn't run before my application falls over? Can anyone clarify exactly
when it should run?

Or maybe compaction doesn't delete old removed keys for some reason?

Thank you for your attention.

Iain Cundy

This message and the information contained herein is proprietary and confidential and subject
to the Amdocs policy statement,
you may review at

View raw message