spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rachana Srivastava <>
Subject Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)
Date Fri, 19 Jun 2020 11:25:16 GMT
 Thanks so much for your response really appreciate!!.  I have added following settings to
reduce the size of metadata.
--conf spark.sql.streaming.fileSink.log.deletion=true \--conf spark.sql.streaming.fileSink.log.compactInterval=3
\--conf spark.sql.streaming.fileSink.log.cleanupDelay=0 \
I have started the system and I will let the system running for sometime and keep you posted. 
I never got any error from the system OOM from driver.  I have written a very simple code 
that is pasted below.  My code used to run fine with zero lag for 3 to 8 days.  Then suddenly
the system hangs...  No error only thing I observed is that size of the metadata grew to
13G.  When I tried the restart the system manually it does not start UNTIL I clean up metadata
and checkpoint folders.
1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf ="value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
4. Flatten events since Parquet does not support hierarachical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

    On Friday, June 19, 2020, 03:42:15 AM PDT, Bartosz Konieczny <>
Since a few weeks ago I took a closer look at the sink and metadata, I will try to help and
complete Jacek's point. Correct me if I'm wrong but that's what I observed for the sink:
- there is no place where Apache Spark keeps all metadata files in the main memory nor their
content - eventually, it can only keep the objects representing the files of the currently
executed micro-batch
- the metadata files are compressed every spark.sql.streaming.fileSource.log.compactInterval
log files
- if spark.sql.streaming.fileSink.log.deletion is set to true (default), the metadata will
be cleaned and cleaned with an extra delay of spark.sql.streaming.fileSink.log.cleanupDelay
milliseconds- only spark.sql.streaming.minBatchesToRetain metadata files should be kept; applies 
globally: checkpoint metadata, state store and file sink; default to 100

A single place where I see a potential OOM because of listing of all metadata files is FileSreamSink
class, here:override def addBatch(batchId: Long, data: DataFrame): Unit = {
  if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
    logInfo(s"Skipping already committed batch $batchId")
// ...
override def getLatest(): Option[(Long, T)] = {
  val batchIds = fileManager.list(metadataPath, batchFilesFilter)
    .map(f => pathToBatchId(f.getPath))
  for (batchId <- batchIds) {
    val batch = get(batchId)
    if (batch.isDefined) {
      return Some((batchId, batch.get))
}But it's a list operation which doesn't read the data content and when you talk about 13GB,
I understand that it's the content of the metadata files, right? And if the cleanup is enabled,
it should never grow so much.

>From that I have some questions:- @Rachana, did you change the configuration?- @Rachana,
do you have an exact stack trace for your error and what happened before? If it has to be
fixed, it should facilitate the work. Maybe the error is elsewhere, or something else produces
the OOM?
- for the snippet with addBatch, I'm wondering why we couldn't directly look for the compacted
file or for the batch file directly? The exactly-once semantic is about the micro-batch and
not the whole history, nope? 
💡 Well, probably I got the answer just after writing the question :P If the clean up is
enabled, having the direct match at the file number would break the exactly-once because we
would rewrite the cleaned metadata file. But IMO, it would be really weird because to reprocess
the past  batch (older than minBatchesToRetain), we should keep the checkpoint metadata (one
with Kafka offsets) and since they follow the same cleanup policy as sink metadata, it should
never happen. Or if it happens, it's only because of manual user changes. 


On Fri, Jun 19, 2020 at 11:03 AM Jacek Laskowski <> wrote:

While we're at it, my basic understanding of the metadata directory is that simply two recent
compacts and the non-compact files in-between are really necessary. Is my understanding correct?
Pozdrawiam,Jacek Laskowski----"The Internals Of" Online Books
Follow me on

On Fri, Jun 19, 2020 at 2:16 AM Jungtaek Lim <> wrote:

Shall we document the known issue on file stream sink and provide workaround? There's more
than a couple of questions about this in a couple of months, and there have been 5 related
issues. The workaround Burak provided looks nice to those who don't need to have end-to-end
exactly once semantics (and in many cases they are OK with the semantics).
On Fri, Jun 19, 2020 at 8:05 AM Burak Yavuz <> wrote:

Hi Rachana,
If you don't need exactly once semantics, you can use foreachBatch to write your data.df.writeStream.foreachBatch
{ case (df, batchId) =>  df.write.mode("append").format(...).save(path)}
However, I would highly recommend upgrading to some ACID data store project like Delta Lake
(which natively supports streaming), Iceberg or Hudi. 
On Thu, Jun 18, 2020 at 8:24 AM Rachana Srivastava <>

 Thanks so much for your response.  I agree using Spark Streaming is not recommended.  But
I want a stable system we cannot have a system that crashes every 5 days.  As seen in the
picture below we have nearly 47 mb of data in the metadata folder.  Issue is when size of
data increases to nearly 13 GB and driver memory is 5 GB that time we get OOM.  Not sure
how to add TTL to metadata, if I delete metadata then I have to delete checkpoint hence loose
the data.  

    On Thursday, June 18, 2020, 03:23:32 AM PDT, Jacek Laskowski <> wrote:
 Hi Rachana,
> Should I go backward and use Spark Streaming DStream based.
No. Never. It's no longer supported (and should really be removed from the codebase once
and for all - dreaming...).
Spark focuses on Spark SQL and Spark Structured Streaming as user-facing modules for batch
and streaming queries, respectively.
Please note that I'm not a PMC member or even a committer so I'm speaking for myself only
(not representing the project in an official way).
Pozdrawiam,Jacek Laskowski----"The Internals Of" Online Books
Follow me on

On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava <>

 Structured Stream Vs Spark Steaming (DStream)?
Which is recommended for system stability.  Exactly once is NOT first priority.  First priority
is STABLE system.
I am I need to make a decision soon.  I need help.  Here is the question again.  Should
I go backward and use Spark Streaming DStream based.  Write our own checkpoint and go from
there.  At least we never encounter these metadata issues there.
    On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <>
 Just in case if anyone prefers ASF projects then there are other alternative projects in
ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg [2]. Both are recently graduated
as top level projects. (DISCLAIMER: I'm not involved in both.)
BTW it would be nice if we make the metadata implementation on file stream source/sink be
pluggable - from what I've seen, plugin approach has been selected as the way to go whenever
some part is going to be complicated and it becomes arguable whether the part should be handled
in Spark project vs should be outside. e.g. checkpoint manager, state store provider, etc.
It would open up chances for the ecosystem to play with the challenge "without completely
re-writing the file stream source and sink", focusing on scalability for metadata in a long
run query. Alternative projects described above will still provide more higher-level features
and look attractive, but sometimes it may be just "using a sledgehammer to crack a nut".

On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <> wrote:

Hello Rachana,
Getting exactly-once semantics on files and making it scale to a very large number of files
are very hard problems to solve. While Structured Streaming + built-in file sink solves the
exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling
in terms of files, combining batch and streaming writes in the same place, etc). And solving
this problem requires a holistic solution that is arguably beyond the scope of the Spark project. 
There are other projects that are trying to solve this file management issue. For example,
Delta Lake (full disclosure, I am involved in it) was built to exactly solve this problem
- get exactly-once and ACID guarantees on files, but also scale to handling millions of files.
Please consider it as part of your solution. 

On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava <>

I have written a simple spark structured steaming app to move data from Kafka to S3. Found
that in order to support exactly-once guarantee spark creates _spark_metadata folder, which
ends up growing too large as the streaming app is SUPPOSE TO run FOREVER. But when the streaming
app runs for a long time the metadata folder grows so big that we start getting OOM errors.
Only way to resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE customer

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER choice?

To unsubscribe e-mail:

Bartosz Koniecznydata engineerhttps://www.waitingforcode.com

View raw message