spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rachana Srivastava <rachanasrivas...@yahoo.com.INVALID>
Subject Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)
Date Wed, 17 Jun 2020 21:31:39 GMT
 Thanks so much TD.  Thanks for forwarding your datalake project but at this time we have
budget constraints we can only use open source project.  
I just want the Structured Streaming Application or Spark Streaming DStream Application to
run without and issue for a long time..  I do not want the size of metadata to grow too large
that we start getting these OOM issues.  The only way to resolve this OOM issues is by deleting
the checkpoint and metadata folders.  That means loosing customer data.   We have 60 seconds
batch where we are coleasing and returning only one file per partition.  So we do not have
small file issues also...
What do you suggest?  How should we resolve this issue?
We have very simple 5 line program that reads data from Kafka and output data to S3.  1.
Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarachical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

    On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:  
 
 Just in case if anyone prefers ASF projects then there are other alternative projects in
ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg [2]. Both are recently graduated
as top level projects. (DISCLAIMER: I'm not involved in both.)
BTW it would be nice if we make the metadata implementation on file stream source/sink be
pluggable - from what I've seen, plugin approach has been selected as the way to go whenever
some part is going to be complicated and it becomes arguable whether the part should be handled
in Spark project vs should be outside. e.g. checkpoint manager, state store provider, etc.
It would open up chances for the ecosystem to play with the challenge "without completely
re-writing the file stream source and sink", focusing on scalability for metadata in a long
run query. Alternative projects described above will still provide more higher-level features
and look attractive, but sometimes it may be just "using a sledgehammer to crack a nut".
1. https://hudi.apache.org/2. https://iceberg.apache.org/


On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <tathagata.das1565@gmail.com> wrote:

Hello Rachana,
Getting exactly-once semantics on files and making it scale to a very large number of files
are very hard problems to solve. While Structured Streaming + built-in file sink solves the
exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling
in terms of files, combining batch and streaming writes in the same place, etc). And solving
this problem requires a holistic solution that is arguably beyond the scope of the Spark project. 
There are other projects that are trying to solve this file management issue. For example,
Delta Lake (full disclosure, I am involved in it) was built to exactly solve this problem
- get exactly-once and ACID guarantees on files, but also scale to handling millions of files.
Please consider it as part of your solution. 



On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava <rachanasrivastav@yahoo.com.invalid>
wrote:

I have written a simple spark structured steaming app to move data from Kafka to S3. Found
that in order to support exactly-once guarantee spark creates _spark_metadata folder, which
ends up growing too large as the streaming app is SUPPOSE TO run FOREVER. But when the streaming
app runs for a long time the metadata folder grows so big that we start getting OOM errors.
Only way to resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE customer
data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER choice?

  
Mime
View raw message