spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fran├žois Garillot (JIRA) <>
Subject [jira] [Commented] (SPARK-5147) write ahead logs from streaming receiver are not purged because cleanupOldBlocks in WriteAheadLogBasedBlockHandler is never called
Date Wed, 14 Jan 2015 10:51:34 GMT


Fran├žois Garillot commented on SPARK-5147:

While I agree this is a serious bug, why delete old batches at a configuration-specified time
? Since [SPARK-4671|], such a WAL deletion
can (with a fresh enough time) lower the number of existing copies in the system to just one,
which will mean data loss in the case where the receiver fails. What's worse is this would
specifically happen in contexts where the WAL is activated, meaning contexts where fault tolerance
is important.

As an alternative, how about reverting [SPARK-4671|]
(replicating blocks twice when the WAL is active), AND making the {{BlockManager}} react to
{{markReady}} / replication, thus keeping track of the number of copies of blocks. The only
value it really needs to keep current is the date of the oldest block for which there isn't
two copies in the system yet. Indeed, all WAL info strictly before that date can be deleted.

Then it can contact the {{ReceiverTracker}}, which, through the {{ReceivedBlockTracker}},
has access to the {{WriteAheadLogManager}}, and can call {{cleanupOldBlocks}} with a time
for which it is *known* that all blocks are replicated twice.

The advantage here is that IIUC, with two copies of a block in the system, the locality of
where computation on the RDD any block is going to eventually be a part of is less predictable
(in particular, it has less chances of being on the {{Executor}} on which the {{Receiver}}
of that block ran, avoiding some fight for resources). For that matter, this is by and of
itself an argument for the reversal of [SPARK-4671|]

> write ahead logs from streaming receiver are not purged because cleanupOldBlocks in WriteAheadLogBasedBlockHandler
is never called
> ----------------------------------------------------------------------------------------------------------------------------------
>                 Key: SPARK-5147
>                 URL:
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>            Priority: Blocker
> Hi all,
> We are running a Spark streaming application with ReliableKafkaReceiver. We have "spark.streaming.receiver.writeAheadLog.enable"
set to true so write ahead logs (WALs) for received data are created under receivedData/streamId
folder in the checkpoint directory. 
> However, old WALs are never purged by time. receivedBlockMetadata and checkpoint files
are purged correctly though. I went through the code, WriteAheadLogBasedBlockHandler class
in ReceivedBlockHandler.scala is responsible for cleaning up the old blocks. It has method
cleanupOldBlocks, which is never called by any class. ReceiverSupervisorImpl class holds a
WriteAheadLogBasedBlockHandler  instance. However, it only calls storeBlock method to create
WALs but never calls cleanupOldBlocks method to purge old WALs.
> The size of the WAL folder increases constantly on HDFS. This is preventing us from running
the ReliableKafkaReceiver 24x7. Can somebody please take a look.
> Thanks,
> Max

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message