spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bartosz Konieczny <bartkoniec...@gmail.com>
Subject Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)
Date Fri, 19 Jun 2020 11:40:24 GMT
Makes sense Jungtaek 👍  But for the compaction we always take the
compaction files according to the compaction interval, right? So never all
batches from the beginning:

/**
 * Returns all valid batches before the specified `compactionBatchId`.
They contain all logs we
 * need to do a new compaction.
 *
 * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this
method should returns
 * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
 */
def getValidBatchesBeforeCompactionBatch(
    compactionBatchId: Long,
    compactInterval: Int): Seq[Long] = {
  assert(isCompactionBatch(compactionBatchId, compactInterval),
    s"$compactionBatchId is not a compaction batch")
  (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
}

private def compact(batchId: Long, logs: Array[T]): Boolean = {
  val validBatches = getValidBatchesBeforeCompactionBatch(batchId,
compactInterval)
  val allLogs = validBatches.map { id =>
    super.get(id).getOrElse {
      throw new IllegalStateException(
        s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
          s"(compactInterval: $compactInterval)")
    }
  }.flatten ++ logs
  // Return false as there is another writer.
  super.add(batchId, compactLogs(allLogs).toArray)
}


I don't see how the compaction process itself, supposing we talk about the
sink, we keep the defaults and not write nearly 1GB per metadata file,
could lead to the memory issues 🤔 ? And regarding Rachana's screenshot,
the compacted files have only 5MB and we can see that the compaction
happens every 10 files, which is the default compact interval. In theory I
got the point but for this specific use case it's hard to imagine. Or maybe
I misunderstood the problem and the point is that at some point in time,
there are 10 consecutive mico-batches that produce 13GB but it seems quite
surprising too.

I suppose that the root cause may be located elsewhere but it's hard to
confirm since we don't have the logs or other debugging information.



On Fri, Jun 19, 2020 at 1:09 PM Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:

> To whom who would like to understand the details on file stream source and
> sink, you'll want to look more closely on CompactibleFileStreamLog, and
> imagine how compact, get, allFiles will work with batch log files when a
> compact batch file is GBs. Memory issue for example, it materializes all
> entries into memory to run compact, produce result of get or allFiles.
> Memory footprint would be definitely smaller than the file size, but still
> be huge in such case.
>
> On Fri, Jun 19, 2020 at 7:42 PM Bartosz Konieczny <bartkonieczny@gmail.com>
> wrote:
>
>> Hi,
>>
>> Since a few weeks ago I took a closer look at the sink and metadata, I
>> will try to help and complete Jacek's point. Correct me if I'm wrong but
>> that's what I observed for the sink:
>>
>> - there is no place where Apache Spark keeps all metadata files in the
>> main memory nor their content - eventually, it can only keep the objects
>> representing the files of the currently executed micro-batch
>> - the metadata files are compressed every
>> spark.sql.streaming.fileSource.log.compactInterval log files
>> - if spark.sql.streaming.fileSink.log.deletion is set to true (default),
>> the metadata will be cleaned and cleaned with an extra delay of
>> spark.sql.streaming.fileSink.log.cleanupDelay milliseconds
>> - only spark.sql.streaming.minBatchesToRetain metadata files should be
>> kept; applies  globally: checkpoint metadata, state store and file sink;
>> default to 100
>>
>> A single place where I see a potential OOM because of listing of all
>> metadata files is FileSreamSink class, here:
>>
>> override def addBatch(batchId: Long, data: DataFrame): Unit = {
>>   if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
>>     logInfo(s"Skipping already committed batch $batchId")
>>   }
>> // ...
>> override def getLatest(): Option[(Long, T)] = {
>>   val batchIds = fileManager.list(metadataPath, batchFilesFilter)
>>     .map(f => pathToBatchId(f.getPath))
>>     .sorted
>>     .reverse
>>   for (batchId <- batchIds) {
>>     val batch = get(batchId)
>>     if (batch.isDefined) {
>>       return Some((batchId, batch.get))
>>     }
>>   }
>>   None
>> }
>>
>> But it's a list operation which doesn't read the data content and when
>> you talk about 13GB, I understand that it's the content of the metadata
>> files, right? And if the cleanup is enabled, it should never grow so much.
>>
>>
>> From that I have some questions:
>> - @Rachana, did you change the configuration?
>> - @Rachana, do you have an exact stack trace for your error and what
>> happened before? If it has to be fixed, it should facilitate the work.
>> Maybe the error is elsewhere, or something else produces the OOM?
>> - for the snippet with addBatch, I'm wondering why we couldn't directly
>> look for the compacted file or for the batch file directly? The
>> exactly-once semantic is about the micro-batch and not the whole history,
>> nope?
>> 💡 Well, probably I got the answer just after writing the question :P If
>> the clean up is enabled, having the direct match at the file number would
>> break the exactly-once because we would rewrite the cleaned metadata file.
>> But IMO, it would be really weird because to reprocess the past  batch
>> (older than minBatchesToRetain), we should keep the checkpoint metadata
>> (one with Kafka offsets) and since they follow the same cleanup policy as
>> sink metadata, it should never happen. Or if it happens, it's only because
>> of manual user changes.
>>
>> Best,
>> Bartosz.
>>
>> On Fri, Jun 19, 2020 at 11:03 AM Jacek Laskowski <jacek@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> While we're at it, my basic understanding of the metadata directory is
>>> that simply two recent compacts and the non-compact files in-between are
>>> really necessary. Is my understanding correct?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Fri, Jun 19, 2020 at 2:16 AM Jungtaek Lim <
>>> kabhwan.opensource@gmail.com> wrote:
>>>
>>>> Shall we document the known issue on file stream sink and provide
>>>> workaround? There's more than a couple of questions about this in a couple
>>>> of months, and there have been 5 related issues. The workaround Burak
>>>> provided looks nice to those who don't need to have end-to-end exactly once
>>>> semantics (and in many cases they are OK with the semantics).
>>>>
>>>> On Fri, Jun 19, 2020 at 8:05 AM Burak Yavuz <brkyvz@gmail.com> wrote:
>>>>
>>>>> Hi Rachana,
>>>>>
>>>>> If you don't need exactly once semantics, you can use foreachBatch to
>>>>> write your data.
>>>>> df.writeStream.foreachBatch { case (df, batchId) =>
>>>>>   df.write.mode("append").format(...).save(path)
>>>>> }
>>>>>
>>>>> However, I would highly recommend upgrading to some ACID data store
>>>>> project like Delta Lake (which natively supports streaming), Iceberg
or
>>>>> Hudi.
>>>>>
>>>>> Best,
>>>>> Burak
>>>>>
>>>>> On Thu, Jun 18, 2020 at 8:24 AM Rachana Srivastava
>>>>> <rachanasrivastav@yahoo.com.invalid> wrote:
>>>>>
>>>>>> Thanks so much for your response.  I agree using Spark Streaming
is
>>>>>> not recommended.  But I want a stable system we cannot have a system
that
>>>>>> crashes every 5 days.  As seen in the picture below we have nearly
47 mb of
>>>>>> data in the metadata folder.  Issue is when size of data increases
to
>>>>>> nearly 13 GB and driver memory is 5 GB that time we get OOM.  Not
sure how
>>>>>> to add TTL to metadata, if I delete metadata then I have to delete
>>>>>> checkpoint hence loose the data.
>>>>>>
>>>>>> [image: Inline image]
>>>>>>
>>>>>>
>>>>>> On Thursday, June 18, 2020, 03:23:32 AM PDT, Jacek Laskowski <
>>>>>> jacek@japila.pl> wrote:
>>>>>>
>>>>>>
>>>>>> Hi Rachana,
>>>>>>
>>>>>> > Should I go backward and use Spark Streaming DStream based.
>>>>>>
>>>>>> No. Never. It's no longer supported (and should really be removed
>>>>>> from the codebase once and for all - dreaming...).
>>>>>>
>>>>>> Spark focuses on Spark SQL and Spark Structured Streaming as
>>>>>> user-facing modules for batch and streaming queries, respectively.
>>>>>>
>>>>>> Please note that I'm not a PMC member or even a committer so I'm
>>>>>> speaking for myself only (not representing the project in an official
way).
>>>>>>
>>>>>> Pozdrawiam,
>>>>>> Jacek Laskowski
>>>>>> ----
>>>>>> https://about.me/JacekLaskowski
>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>
>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava
>>>>>> <rachanasrivastav@yahoo.com.invalid> wrote:
>>>>>>
>>>>>> *Structured Stream Vs Spark Steaming (DStream)?*
>>>>>>
>>>>>> Which is recommended for system stability.  Exactly once is NOT first
>>>>>> priority.  First priority is STABLE system.
>>>>>>
>>>>>> I am I need to make a decision soon.  I need help.  Here is the
>>>>>> question again.  Should I go backward and use Spark Streaming DStream
>>>>>> based.  Write our own checkpoint and go from there.  At least we
never
>>>>>> encounter these metadata issues there.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Rachana
>>>>>>
>>>>>> On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <
>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>
>>>>>> Just in case if anyone prefers ASF projects then there are other
>>>>>> alternative projects in ASF as well, alphabetically, Apache Hudi
[1] and
>>>>>> Apache Iceberg [2]. Both are recently graduated as top level projects.
>>>>>> (DISCLAIMER: I'm not involved in both.)
>>>>>>
>>>>>> BTW it would be nice if we make the metadata implementation on file
>>>>>> stream source/sink be pluggable - from what I've seen, plugin approach
has
>>>>>> been selected as the way to go whenever some part is going to be
>>>>>> complicated and it becomes arguable whether the part should be handled
in
>>>>>> Spark project vs should be outside. e.g. checkpoint manager, state
>>>>>> store provider, etc. It would open up chances for the ecosystem to
play
>>>>>> with the challenge "without completely re-writing the file stream
source
>>>>>> and sink", focusing on scalability for metadata in a long run query.
>>>>>> Alternative projects described above will still provide more higher-level
>>>>>> features and look attractive, but sometimes it may be just "using
a
>>>>>> sledgehammer to crack a nut".
>>>>>>
>>>>>> 1. https://hudi.apache.org/
>>>>>> 2. https://iceberg.apache.org/
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>> Hello Rachana,
>>>>>>
>>>>>> Getting exactly-once semantics on files and making it scale to a
very
>>>>>> large number of files are very hard problems to solve. While Structured
>>>>>> Streaming + built-in file sink solves the exactly-once guarantee
that
>>>>>> DStreams could not, it is definitely limited in other ways (scaling
in
>>>>>> terms of files, combining batch and streaming writes in the same
place,
>>>>>> etc). And solving this problem requires a holistic solution that
is
>>>>>> arguably beyond the scope of the Spark project.
>>>>>>
>>>>>> There are other projects that are trying to solve this file
>>>>>> management issue. For example, Delta Lake <https://delta.io/>(full
>>>>>> disclosure, I am involved in it) was built to exactly solve this
problem -
>>>>>> get exactly-once and ACID guarantees on files, but also scale to
handling
>>>>>> millions of files. Please consider it as part of your solution.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava
>>>>>> <rachanasrivastav@yahoo.com.invalid> wrote:
>>>>>>
>>>>>> I have written a simple spark structured steaming app to move data
>>>>>> from Kafka to S3. Found that in order to support exactly-once guarantee
>>>>>> spark creates _spark_metadata folder, which ends up growing too large
as
>>>>>> the streaming app is SUPPOSE TO run FOREVER. But when the streaming
app
>>>>>> runs for a long time the metadata folder grows so big that we start
getting
>>>>>> OOM errors. Only way to resolve OOM is delete Checkpoint and Metadata
>>>>>> folder and loose VALUABLE customer data.
>>>>>>
>>>>>> Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and
>>>>>> SPARK-24295)
>>>>>> Since Spark Streaming was NOT broken like this. Is Spark Streaming
a
>>>>>> BETTER choice?
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>
>>
>> --
>> Bartosz Konieczny
>> data engineer
>> https://www.waitingforcode.com
>> https://github.com/bartosz25/
>> https://twitter.com/waitingforcode
>>
>>

-- 
Bartosz Konieczny
data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode

Mime
View raw message