spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream
Date Thu, 05 Jul 2018 23:00:50 GMT
Hey all,

In Spark 2.4.0, there will be a new feature called *foreachBatch* which
will expose the output rows of every micro-batch as a dataframe, on which
you apply a user-defined function. With that, you can reuse existing batch
sources for writing results as well as write results to multiple locations.

*Reuse existing batch data sources*
For many storage systems, there may not be a streaming sink available yet,
but there may already exist a data writer for batch queries. Using
foreachBatch(), you can use the batch data writers on the output of each
micro-batch. For example writing from a stream to cassandra using the
Cassandra Spark Connector will be like

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.write.cassandraFormat(...).save(...)
}

*Write to multiple locations*
If you want to write the output of a streaming query to multiple locations,
then you can simply write the output DataFrame/Dataset multiple times.
However, each attempt to write can cause the output data to be recomputed
(including possible re-reading of the input data). To avoid recomputations,
you should cache the output DataFrame/Dataset, write it to multiple
locations, and then uncache it. Here is an outline.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

*Apply additional DataFrame operations*
Many DataFrame and Dataset operations are not supported in streaming
DataFrames because Spark does not support generating incremental plans in
those cases. Using foreachBatch() you can apply some of these operations on
each micro-batch output. However, you will have to reason about the
end-to-end semantics of doing that operation yourself.

*NOTE: *By default foreachBatch() provides only at-least-once write
guarantees. However, you can use the batchId provided to the function as a
way to deduplicate the output and get an exactly-once guarantee.

TD

On Thu, Jul 5, 2018 at 12:33 AM, Amiya Mishra <
Amiya.Mishra@bitwiseglobal.com> wrote:

> Hi Chandan/J├╝rgen,
>
> I had tried through a native code having single input data frame with
> multiple sinks as :
>
> Spark provides a method called awaitAnyTermination() in
> StreamingQueryManager.scala which provides all the required details to
> handle the query processed by spark.By observing documentation of spark
> with
> below points :
>                 -> Wait until any of the queries on the associated
> SQLContext has
> terminated since the creation of the context, or since `resetTerminated()`
> was called. If any query was terminated
>                 -> If a query has terminated, then subsequent calls to
> `awaitAnyTermination()` will either return immediately (if the query was
> terminated  by `query.stop()`),or throw the exception immediately (if the
> query was terminated with exception). Use `resetTerminated()` to clear past
> terminations and wait for new terminations.
>                 -> In the case where multiple queries have terminated since
> `resetTermination()` was called, if any query has terminated with
> exception,
> when `awaitAnyTermination()` will throw any of the exception. For correctly
> documenting exceptions across multiple queries,users need to  stop all of
> them after any of them terminates with exception, and then check the
> `query.exception()` for each query.
>
>
> val inputdf:DataFrame =
> sparkSession.readStream.schema(schema).format("csv").
> option("delimiter",",").csv("src/main/streamingInput")
>     query1 =
> inputdf.writeStream.option("path","first_output").option("
> checkpointLocation","checkpointloc").format("csv").start()
>     query2 =
> inputdf.writeStream.option("path","second_output").option(
> "checkpointLocation","checkpoint2").format("csv").start()
>     sparkSession.streams.awaitAnyTermination()
>
>
> Now, both "first_output" and "second_output" file write successfully.
>
> Try it out on your site and let me know if you found any limitation.And try
> to posting if you found any other way.
>
> Let me correct if i had grammatical mistake.
>
> Thanks
> Amiya
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Mime
View raw message