spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <>
Subject Re: [Structured Streaming] Application Updates in Production
Date Fri, 23 Mar 2018 03:00:31 GMT
Yes indeed, we dont directly support schema migration of state as of now.
However, depending on what stateful operator you are using, you can work
around it. For example, if you are using mapGroupsWithState /
flatMapGroupsWithState, you can save explicitly convert your state to
avro-encoded bytes and save bytes as state. You will be responsible for
encoding the state in avro such that when you can migrate schema yourself
(much like kafka + avro + schema registry).

On Wed, Mar 21, 2018 at 5:45 PM, Priyank Shrivastava <>

> TD,
> But what if the state schema does change?  My understanding is that if in
> the new code I change the state schema the application will not be able to
> use the old checkpoints.  Is that not correct?
> Applications running is parallel is to ensure there is no downtime in
> production i.e because the new app will not pick up from the old
> checkpoints, one would need to keep the old app and the new app running
> until new app catches up on data processing with the old app.
> ----- Original message -----
> From: Tathagata Das <>
> To: Priyank Shrivastava <>
> Cc: user <>
> Subject: Re: [Structured Streaming] Application Updates in Production
> Date: Wed, Mar 21, 2018 5:28 PM
> Why do you want to start the new code in parallel to the old one? Why not
> stop the old one, and then start the new one? Structured Streaming ensures
> that all checkpoint information (offsets and state) are future-compatible
> (as long as state schema is unchanged), hence new code should be able to
> pick exactly where the old code left off.
> TD
> On Wed, Mar 21, 2018 at 11:56 AM, Priyank Shrivastava <
>> wrote:
> I am using Structured Streaming with Spark 2.2.  We are using Kafka as our
> source and are using checkpoints for failure recovery and e2e exactly once
> guarantees.  I would like to get some more information on how to handle
> updates to the application when there is a change in stateful operations
> and/or output schema.
> As some of the sources suggest I can start the updated application
> parallelly with the old application until it catches up with the old
> application in terms of data, and then kill the old one.  But then the new
> application will have to re-read/re-process all the data in kafka which
> could take a long time.
> I want to AVOID this re-processing of the data in the newly deployed
> updated application.
> One way I can think of is for the application to keep writing the offsets
> into something in addition to the checkpoint directory, for example in
> zookeeper/hdfs.  And then, on an update of the application, I command Kafka
> readstream() to start reading from the offsets stored in this new location
> (zookeeper/hdfs) - since the updated application can't read from the
> checkpoint directory which is now deemed incompatible.
> So a couple of questions:
> 1.  Is the above-stated solution a valid solution?
> 2.  If yes, How can I automate the detection of whether the application is
> being restarted because of a failure/maintenance or because of code changes
> to stateful operations and/or output schema?
> Any guidance, example or information source is appreciated.
> Thanks,
> Priyank

View raw message