storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nasron Cheong <>
Subject Re: Migrating from storm-kafka to storm-kafka-client
Date Fri, 12 Jan 2018 02:43:37 GMT
Hi Stig,

That's great! Thanks for all the info. Looking through the code, one small
detail is the difference between storm-kafka-client's format and
storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter
uses 'offset' and 'nextOffset'.

So, can I map with

firstOffset = offset


lastOffset = nextOffset+1 ?

Looking through the code it seems to be that nextOffset is placed after the
last consumed message, but I'm not sure.


- Nasron

On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <> wrote:

> Nasron,
> Okay, migrating a Trident spout is a very different thing. Trident spouts
> store their state in Storm's zookeeper (unless you decide otherwise by
> setting transactional.zookeeper.servers in storm.yaml). This also applies
> to the storm-kafka-client Trident spout, so we won't need to move offsets
> into Kafka.
> The idea of stopping all the producers and starting at LATEST (or
> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
> skipping tuples. In order to get Trident to commit something, you have to
> deploy the new topology with LATEST and start the producers again, wait
> until at least one commit happens, and then take the topology back down and
> redeploy with whatever your first poll strategy normally is. If the worker
> crashes before the spout manages to commit something, you will skip tuples.
> If you don't want to do that, here's my notes on storm-kafka ->
> storm-kafka-client for Trident:
> The storage formats and zk paths for the two spouts are a little
> different. Both spouts store their state as JSON maps, but some of the keys
> are different. I use ${} below to indicate variable substitution.
> The root path (in the following: zkRoot) for your spouts data is
> /${transactional.zookeeper.root from storm.yaml}/${txId you set with
> TopologyBuilder.newStream}/user.
> For the storm-kafka spout the offsets are stored in one of the following
> two paths:
> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard
> topic subscriptions
> ${zkRoot}/partition_${partition} otherwise
> The storage format for storm-kafka is as follows:
> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
> if you are using wildcard topic subscriptions
> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
> otherwise (I left out some irrelevant properties)
> For storm-kafka-client the zk path is
> ${zkRoot}/${topicName}@${partition}
> and the storage format is
> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }
> In order to migrate from storm-kafka to storm-kafka-client, we need to
> stop the topology and run a script that moves the offsets from the old
> location/format to the new location/format. There's no way to tell Trident
> to read from one path/format and write to another, so it has to be done
> offline. Once the offsets are migrated, the spout can be replaced in the
> topology and the topology can be redeployed.
> I might look at writing an application that can do this at some point, but
> it might take me a while. If you'd like to look at it yourself, here's some
> pointers where to start:
> * This is where the offset are written to Zookeeper, assuming you use an
> opaque spout
> src/jvm/org/apache/storm/trident/spout/OpaquePartitione
> You might want to look at this class for
> a bit (particularly the emit function), because it's pretty useful for
> understanding how/where Trident stores metadata for spouts.
> * The return value of
> m/blob/master/external/storm-kafka/src/jvm/org/apache/
> storm/kafka/trident/ defines the format of
> what's being saved to Zookeeper for storm-kafka. It's being wrapped in a
> map so the full written value is { "${topicName}partition_${partition}':
> ${theReturnValue} } (see the storage format note above, it's different if
> you're not using wildcard subscriptions)
> * Similarly for storm-kafka-client the return value of
> kafka-client/src/main/java/org/apache/storm/kafka/spout/
> trident/ defines the format of what
> that spout saves to Zookeeper (and expects to find).
> * You should use zkCli (it's in your zookeeper/bin directory) to explore
> your Zookeeper filesystem. It should be pretty easy to find your offsets in
> there with that tool.
> Sorry about the wall of text, this turned out to have a lot of detail to
> cover.
> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <>:
>> Thanks Stig,
>> So after some digging, I realized we are really migrating from the kafka
>> trident emitter in storm-kafka, to the trident emitter in
>> storm-kafka-client.
>> As far as I can see, the offset information is still stored in zk, and
>> the offset info for storm-kafka is (
>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/
>> storm/kafka/trident/
>> However this seems quite different from storm-kafka-client, which uses
>> storm-kafka-client/src/main/java/org/apache/storm/kafka/
>> spout/trident/
>> I'm not sure under which zknode this information is stored - and if the
>> zknode itself is different between the two implementations.
>> Looks like I need a tool to copy the stored values in zk from old
>> storm-kafka to storm-kafka-client?
>> Another option I suppose is to:
>> - stop topic producers
>> - run the old code until it drains all topics
>> - start new code with FirstPollOffsetStrategy.LATEST
>> Although this seems risky.
>> Thanks!
>> - Nasron
>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <>
>> wrote:
>>> Hi Nasron,
>>> I don't believe there's currently a tool to help you migrate. We did it
>>> manually by writing a small utility that looked up the commit offsets in
>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>>> and committed the offsets for the appropriate partitions. We stopped our
>>> topologies, used this utility and redeployed with the new spout.
>>> Assuming there isn't already a tool for migration floating around
>>> somewhere, I think we could probably build some migration support into the
>>> storm-kafka-client spout. If the path to the old offsets in Storm's
>>> Zookeeper is given, we might be able to extract them and start up the new
>>> spout from there.
>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <>:
>>>> Hi,
>>>> I'm trying to determine steps for migration to the storm-kafka-client
>>>> in order to use the new kafka client.
>>>> It's not quite clear to me how offsets are migrated - is there a
>>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>>> into the kafka based offsets?
>>>> Or is the original configuration respected, and storm-kafka-client can
>>>> mostly be a drop in replacement?
>>>> I want to avoid having spouts reset to the beginning of topics after
>>>> deployment, due to this change.
>>>> Thanks.
>>>> - Nasron

View raw message