spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Ott <>
Subject Re: Spark Structured Streaming Continuous Trigger on multiple sinks
Date Sun, 12 Sep 2021 13:26:47 GMT
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to multiple sinks.
We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does not serve
our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is not supported
in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. The second
one does not.

 S> Is there any solution to the problem of being able to write to multiple sinks in Continuous
Trigger Mode using Structured Streaming?

With best wishes,                    Alex Ott
Twitter: alexott_en (English), alexott (Russian)

To unsubscribe e-mail:

View raw message