spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bartosz Konieczny <>
Subject Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)
Date Fri, 19 Jun 2020 11:40:24 GMT
Makes sense Jungtaek 👍  But for the compaction we always take the
compaction files according to the compaction interval, right? So never all
batches from the beginning:

 * Returns all valid batches before the specified `compactionBatchId`.
They contain all logs we
 * need to do a new compaction.
 * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this
method should returns
 * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
def getValidBatchesBeforeCompactionBatch(
    compactionBatchId: Long,
    compactInterval: Int): Seq[Long] = {
  assert(isCompactionBatch(compactionBatchId, compactInterval),
    s"$compactionBatchId is not a compaction batch")
  (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId

private def compact(batchId: Long, logs: Array[T]): Boolean = {
  val validBatches = getValidBatchesBeforeCompactionBatch(batchId,
  val allLogs = { id =>
    super.get(id).getOrElse {
      throw new IllegalStateException(
        s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
          s"(compactInterval: $compactInterval)")
  }.flatten ++ logs
  // Return false as there is another writer.
  super.add(batchId, compactLogs(allLogs).toArray)

I don't see how the compaction process itself, supposing we talk about the
sink, we keep the defaults and not write nearly 1GB per metadata file,
could lead to the memory issues 🤔 ? And regarding Rachana's screenshot,
the compacted files have only 5MB and we can see that the compaction
happens every 10 files, which is the default compact interval. In theory I
got the point but for this specific use case it's hard to imagine. Or maybe
I misunderstood the problem and the point is that at some point in time,
there are 10 consecutive mico-batches that produce 13GB but it seems quite
surprising too.

I suppose that the root cause may be located elsewhere but it's hard to
confirm since we don't have the logs or other debugging information.

On Fri, Jun 19, 2020 at 1:09 PM Jungtaek Lim <>

> To whom who would like to understand the details on file stream source and
> sink, you'll want to look more closely on CompactibleFileStreamLog, and
> imagine how compact, get, allFiles will work with batch log files when a
> compact batch file is GBs. Memory issue for example, it materializes all
> entries into memory to run compact, produce result of get or allFiles.
> Memory footprint would be definitely smaller than the file size, but still
> be huge in such case.
> On Fri, Jun 19, 2020 at 7:42 PM Bartosz Konieczny <>
> wrote:
>> Hi,
>> Since a few weeks ago I took a closer look at the sink and metadata, I
>> will try to help and complete Jacek's point. Correct me if I'm wrong but
>> that's what I observed for the sink:
>> - there is no place where Apache Spark keeps all metadata files in the
>> main memory nor their content - eventually, it can only keep the objects
>> representing the files of the currently executed micro-batch
>> - the metadata files are compressed every
>> spark.sql.streaming.fileSource.log.compactInterval log files
>> - if spark.sql.streaming.fileSink.log.deletion is set to true (default),
>> the metadata will be cleaned and cleaned with an extra delay of
>> spark.sql.streaming.fileSink.log.cleanupDelay milliseconds
>> - only spark.sql.streaming.minBatchesToRetain metadata files should be
>> kept; applies  globally: checkpoint metadata, state store and file sink;
>> default to 100
>> A single place where I see a potential OOM because of listing of all
>> metadata files is FileSreamSink class, here:
>> override def addBatch(batchId: Long, data: DataFrame): Unit = {
>>   if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
>>     logInfo(s"Skipping already committed batch $batchId")
>>   }
>> // ...
>> override def getLatest(): Option[(Long, T)] = {
>>   val batchIds = fileManager.list(metadataPath, batchFilesFilter)
>>     .map(f => pathToBatchId(f.getPath))
>>     .sorted
>>     .reverse
>>   for (batchId <- batchIds) {
>>     val batch = get(batchId)
>>     if (batch.isDefined) {
>>       return Some((batchId, batch.get))
>>     }
>>   }
>>   None
>> }
>> But it's a list operation which doesn't read the data content and when
>> you talk about 13GB, I understand that it's the content of the metadata
>> files, right? And if the cleanup is enabled, it should never grow so much.
>> From that I have some questions:
>> - @Rachana, did you change the configuration?
>> - @Rachana, do you have an exact stack trace for your error and what
>> happened before? If it has to be fixed, it should facilitate the work.
>> Maybe the error is elsewhere, or something else produces the OOM?
>> - for the snippet with addBatch, I'm wondering why we couldn't directly
>> look for the compacted file or for the batch file directly? The
>> exactly-once semantic is about the micro-batch and not the whole history,
>> nope?
>> 💡 Well, probably I got the answer just after writing the question :P If
>> the clean up is enabled, having the direct match at the file number would
>> break the exactly-once because we would rewrite the cleaned metadata file.
>> But IMO, it would be really weird because to reprocess the past  batch
>> (older than minBatchesToRetain), we should keep the checkpoint metadata
>> (one with Kafka offsets) and since they follow the same cleanup policy as
>> sink metadata, it should never happen. Or if it happens, it's only because
>> of manual user changes.
>> Best,
>> Bartosz.
>> On Fri, Jun 19, 2020 at 11:03 AM Jacek Laskowski <> wrote:
>>> Hi,
>>> While we're at it, my basic understanding of the metadata directory is
>>> that simply two recent compacts and the non-compact files in-between are
>>> really necessary. Is my understanding correct?
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> "The Internals Of" Online Books <>
>>> Follow me on
>>> <>
>>> On Fri, Jun 19, 2020 at 2:16 AM Jungtaek Lim <
>>>> wrote:
>>>> Shall we document the known issue on file stream sink and provide
>>>> workaround? There's more than a couple of questions about this in a couple
>>>> of months, and there have been 5 related issues. The workaround Burak
>>>> provided looks nice to those who don't need to have end-to-end exactly once
>>>> semantics (and in many cases they are OK with the semantics).
>>>> On Fri, Jun 19, 2020 at 8:05 AM Burak Yavuz <> wrote:
>>>>> Hi Rachana,
>>>>> If you don't need exactly once semantics, you can use foreachBatch to
>>>>> write your data.
>>>>> df.writeStream.foreachBatch { case (df, batchId) =>
>>>>>   df.write.mode("append").format(...).save(path)
>>>>> }
>>>>> However, I would highly recommend upgrading to some ACID data store
>>>>> project like Delta Lake (which natively supports streaming), Iceberg
>>>>> Hudi.
>>>>> Best,
>>>>> Burak
>>>>> On Thu, Jun 18, 2020 at 8:24 AM Rachana Srivastava
>>>>> <> wrote:
>>>>>> Thanks so much for your response.  I agree using Spark Streaming
>>>>>> not recommended.  But I want a stable system we cannot have a system
>>>>>> crashes every 5 days.  As seen in the picture below we have nearly
47 mb of
>>>>>> data in the metadata folder.  Issue is when size of data increases
>>>>>> nearly 13 GB and driver memory is 5 GB that time we get OOM.  Not
sure how
>>>>>> to add TTL to metadata, if I delete metadata then I have to delete
>>>>>> checkpoint hence loose the data.
>>>>>> [image: Inline image]
>>>>>> On Thursday, June 18, 2020, 03:23:32 AM PDT, Jacek Laskowski <
>>>>>>> wrote:
>>>>>> Hi Rachana,
>>>>>> > Should I go backward and use Spark Streaming DStream based.
>>>>>> No. Never. It's no longer supported (and should really be removed
>>>>>> from the codebase once and for all - dreaming...).
>>>>>> Spark focuses on Spark SQL and Spark Structured Streaming as
>>>>>> user-facing modules for batch and streaming queries, respectively.
>>>>>> Please note that I'm not a PMC member or even a committer so I'm
>>>>>> speaking for myself only (not representing the project in an official
>>>>>> Pozdrawiam,
>>>>>> Jacek Laskowski
>>>>>> ----
>>>>>> "The Internals Of" Online Books <>
>>>>>> Follow me on
>>>>>> <>
>>>>>> On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava
>>>>>> <> wrote:
>>>>>> *Structured Stream Vs Spark Steaming (DStream)?*
>>>>>> Which is recommended for system stability.  Exactly once is NOT first
>>>>>> priority.  First priority is STABLE system.
>>>>>> I am I need to make a decision soon.  I need help.  Here is the
>>>>>> question again.  Should I go backward and use Spark Streaming DStream
>>>>>> based.  Write our own checkpoint and go from there.  At least we
>>>>>> encounter these metadata issues there.
>>>>>> Thanks,
>>>>>> Rachana
>>>>>> On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <
>>>>>>> wrote:
>>>>>> Just in case if anyone prefers ASF projects then there are other
>>>>>> alternative projects in ASF as well, alphabetically, Apache Hudi
[1] and
>>>>>> Apache Iceberg [2]. Both are recently graduated as top level projects.
>>>>>> (DISCLAIMER: I'm not involved in both.)
>>>>>> BTW it would be nice if we make the metadata implementation on file
>>>>>> stream source/sink be pluggable - from what I've seen, plugin approach
>>>>>> been selected as the way to go whenever some part is going to be
>>>>>> complicated and it becomes arguable whether the part should be handled
>>>>>> Spark project vs should be outside. e.g. checkpoint manager, state
>>>>>> store provider, etc. It would open up chances for the ecosystem to
>>>>>> with the challenge "without completely re-writing the file stream
>>>>>> and sink", focusing on scalability for metadata in a long run query.
>>>>>> Alternative projects described above will still provide more higher-level
>>>>>> features and look attractive, but sometimes it may be just "using
>>>>>> sledgehammer to crack a nut".
>>>>>> 1.
>>>>>> 2.
>>>>>> On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <
>>>>>>> wrote:
>>>>>> Hello Rachana,
>>>>>> Getting exactly-once semantics on files and making it scale to a
>>>>>> large number of files are very hard problems to solve. While Structured
>>>>>> Streaming + built-in file sink solves the exactly-once guarantee
>>>>>> DStreams could not, it is definitely limited in other ways (scaling
>>>>>> terms of files, combining batch and streaming writes in the same
>>>>>> etc). And solving this problem requires a holistic solution that
>>>>>> arguably beyond the scope of the Spark project.
>>>>>> There are other projects that are trying to solve this file
>>>>>> management issue. For example, Delta Lake <>(full
>>>>>> disclosure, I am involved in it) was built to exactly solve this
problem -
>>>>>> get exactly-once and ACID guarantees on files, but also scale to
>>>>>> millions of files. Please consider it as part of your solution.
>>>>>> On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava
>>>>>> <> wrote:
>>>>>> I have written a simple spark structured steaming app to move data
>>>>>> from Kafka to S3. Found that in order to support exactly-once guarantee
>>>>>> spark creates _spark_metadata folder, which ends up growing too large
>>>>>> the streaming app is SUPPOSE TO run FOREVER. But when the streaming
>>>>>> runs for a long time the metadata folder grows so big that we start
>>>>>> OOM errors. Only way to resolve OOM is delete Checkpoint and Metadata
>>>>>> folder and loose VALUABLE customer data.
>>>>>> Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and
>>>>>> SPARK-24295)
>>>>>> Since Spark Streaming was NOT broken like this. Is Spark Streaming
>>>>>> BETTER choice?
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail:
>> --
>> Bartosz Konieczny
>> data engineer

Bartosz Konieczny
data engineer

View raw message