spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <>
Subject Re: Streaming checkpoints and logic change
Date Wed, 08 Jul 2015 17:59:23 GMT
Hey Jong,

No I did answer the right question. What I explained did not change the JVM
classes (that is the function is the same) but it still ensures that
computation is different (the filters get updated with time). So you can
checkpoint this and recover from it. This is ONE possible way to do
dynamically changing logic within the constraints of checkpointing. Since
there was no further idea provided on the kind of "dynamicity" in logic you
are interested in, I gave ONE possible way to do it. But I agree that i
should tied the loose ends by confirming that this should work with
checkpointing as the JVM class representing the function does not need to
change for changing the logic.

Now lets address your case of log transformer which extract field of logs
from Kafka stream. Could you provide some pseudocode on the kind of change
you would want to see? I want to learn more on what kind of dynamicity
people want. I am aware of this limitation I want to address this in
future, but for that I want to understand the requirements.

BTW, your workaround is a pretty good workaround.


On Wed, Jul 8, 2015 at 10:38 AM, Jong Wook Kim <> wrote:

> Hi TD, you answered a wrong question. If you read the subject, mine was
> specifically about checkpointing. I'll elaborate
> The checkpoint, which is a serialized DStream DAG, contains all the
> metadata and *logic*, like the function passed to e.g. DStream.transform()
> This is serialized as a anonymous inner class at the JVM level, and will
> not tolerate the slightest logic change, because the class signature will
> change and cannot deserialize from the checkpoint which contains the
> serialized from the previous version.
> Logic changes are extremely common in stream processing. Say I have a log
> transformer which extracts certain fields of logs from a Kafka stream and I
> want to add another field to extract. This involves dstream logic changes,
> thus cannot be done using checkpoint, I can't even achieve at-least-once
> guarantee.
> My current workaround is to read current offsets by casting to
> HasOffsetRanges
> <>
> saving them to ZooKeeper, and give fromOffsets parameter read from
> ZooKeeper when creating a directStream. I've settled down to this approach
> for now, but I want to know how makers of Spark Streaming think about this
> drawback of checkpointing.
> If anyone had similar experience, suggestions will be appreciated.
> Jong Wook
> On 9 July 2015 at 02:13, Tathagata Das <> wrote:
>> You can use DStream.transform for some stuff. Transform takes a RDD =>
>> RDD function that allow arbitrary RDD operations to be done on RDDs of a
>> DStream. This function gets evaluated on the driver on every batch
>> interval. If you are smart about writing the function, it can do different
>> stuff at different intervals. For example, you can always use a
>> continuously updated set of filters
>> dstream.transform { rdd =>
>>    val broadcastedFilters = Filters.getLatest()
>>    val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
>>    newRDD
>> }
>> The function Filters.getLatest() will return the latest set of filters
>> that is broadcasted out, and as the transform function is processed in
>> every batch interval, it will always use the latest filters.
>> HTH.
>> TD
>> On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim <> wrote:
>>> I just asked this question at the streaming webinar that just ended, but
>>> the speakers didn't answered so throwing here:
>>> AFAIK checkpoints are the only recommended method for running Spark
>>> streaming without data loss. But it involves serializing the entire dstream
>>> graph, which prohibits any logic changes. How should I update / fix logic
>>> of a running streaming app without any data loss?
>>> Jong Wook

View raw message