spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From M Singh <mans2si...@yahoo.com.INVALID>
Subject Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size
Date Thu, 04 Jan 2018 21:49:12 GMT
Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc indicate you can use
foreach or collect on the dataframe.  If the data is very large then a collect may result
in OOM.
>From your answer it appears that the only way to control the size (in 2.2) would be control
the trigger interval. However, in my case, I have to dedup the elements in one minute interval,
which I am using a trigger interval and cannot reduce it.  Do you have any other suggestion/recommendation
?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

    On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely
logical representation of data and has no association with partitions, etc. which are physical
representations.
2. If you want to limit the amount of data that is processed in a trigger, then you should
either control the trigger interval or use the rate limit options on sources that support
it (e.g. for kafka, you can use the option "maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 2.3, we are
adding a DataSource V2 API for batch/microbatch-streaming/continuous-streaming sources and
sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2singh@yahoo.com.invalid> wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic
and if   * this method is called more than once with the same batchId (which will happen
in the case of   * failures), then `data` should only be added once.   *   * Note 1:
You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).  
* Otherwise, you may get a wrong result.   *   * Note 2: The method is supposed to be
executed synchronously, i.e. the method should only return   * after data is consumed by
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to addBatch - 1.
Is it all the data in a partition for each trigger or is it all the data in that trigger ?  2.
Is there a way to control the size in each addBatch invocation to make sure that we don't
run into OOM exception on the executor while calling collect ?
Thanks



   
Mime
View raw message