Hi Stig,

That's great! Thanks for all the info. Looking through the code, one small detail is the difference between storm-kafka-client's format and storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter uses 'offset' and 'nextOffset'.

So, can I map with

firstOffset = offset


lastOffset = nextOffset+1 ?

Looking through the code it seems to be that nextOffset is placed after the last consumed message, but I'm not sure. 


- Nasron

On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <srdo@apache.org> wrote:

Okay, migrating a Trident spout is a very different thing. Trident spouts store their state in Storm's zookeeper (unless you decide otherwise by setting transactional.zookeeper.servers in storm.yaml). This also applies to the storm-kafka-client Trident spout, so we won't need to move offsets into Kafka.

The idea of stopping all the producers and starting at LATEST (or UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of skipping tuples. In order to get Trident to commit something, you have to deploy the new topology with LATEST and start the producers again, wait until at least one commit happens, and then take the topology back down and redeploy with whatever your first poll strategy normally is. If the worker crashes before the spout manages to commit something, you will skip tuples.

If you don't want to do that, here's my notes on storm-kafka -> storm-kafka-client for Trident:

The storage formats and zk paths for the two spouts are a little different. Both spouts store their state as JSON maps, but some of the keys are different. I use ${} below to indicate variable substitution.

The root path (in the following: zkRoot) for your spouts data is /${transactional.zookeeper.root from storm.yaml}/${txId you set with TopologyBuilder.newStream}/user.

For the storm-kafka spout the offsets are stored in one of the following two paths:
${zkRoot}/${topicName}partition_${partition} if you are using wildcard topic subscriptions
${zkRoot}/partition_${partition} otherwise

The storage format for storm-kafka is as follows:
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } } if you are using wildcard topic subscriptions
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } } otherwise (I left out some irrelevant properties)

For storm-kafka-client the zk path is

and the storage format is
{ "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }

In order to migrate from storm-kafka to storm-kafka-client, we need to stop the topology and run a script that moves the offsets from the old location/format to the new location/format. There's no way to tell Trident to read from one path/format and write to another, so it has to be done offline. Once the offsets are migrated, the spout can be replaced in the topology and the topology can be redeployed.

I might look at writing an application that can do this at some point, but it might take me a while. If you'd like to look at it yourself, here's some pointers where to start:
* This is where the offset are written to Zookeeper, assuming you use an opaque spout https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L184. You might want to look at this class for a bit (particularly the emit function), because it's pretty useful for understanding how/where Trident stores metadata for spouts.
* The return value of https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java#L85 defines the format of what's being saved to Zookeeper for storm-kafka. It's being wrapped in a map so the full written value is { "${topicName}partition_${partition}': ${theReturnValue} } (see the storage format note above, it's different if you're not using wildcard subscriptions)
* Similarly for storm-kafka-client the return value of https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L106 defines the format of what that spout saves to Zookeeper (and expects to find).
* You should use zkCli (it's in your zookeeper/bin directory) to explore your Zookeeper filesystem. It should be pretty easy to find your offsets in there with that tool.

Sorry about the wall of text, this turned out to have a lot of detail to cover.

2018-01-10 21:40 GMT+01:00 Nasron Cheong <nasron@gmail.com>:
Thanks Stig,

So after some digging, I realized we are really migrating from the kafka trident emitter in storm-kafka, to the trident emitter in storm-kafka-client. 

As far as I can see, the offset information is still stored in zk, and the offset info for storm-kafka is (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java#L140)

I'm not sure under which zknode this information is stored - and if the zknode itself is different between the two implementations.

Looks like I need a tool to copy the stored values in zk from old storm-kafka to storm-kafka-client?

Another option I suppose is to:
- stop topic producers
- run the old code until it drains all topics
- start new code with FirstPollOffsetStrategy.LATEST

Although this seems risky.


- Nasron

On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <srdo@apache.org> wrote:
Hi Nasron,

I don't believe there's currently a tool to help you migrate. We did it manually by writing a small utility that looked up the commit offsets in Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id and committed the offsets for the appropriate partitions. We stopped our topologies, used this utility and redeployed with the new spout.

Assuming there isn't already a tool for migration floating around somewhere, I think we could probably build some migration support into the storm-kafka-client spout. If the path to the old offsets in Storm's Zookeeper is given, we might be able to extract them and start up the new spout from there.

2017-12-19 21:59 GMT+01:00 Nasron Cheong <nasron@gmail.com>:

I'm trying to determine steps for migration to the storm-kafka-client in order to use the new kafka client.

It's not quite clear to me how offsets are migrated - is there a specific set of steps to ensure offsets are moved from the ZK based offsets into the kafka based offsets? 

Or is the original configuration respected, and storm-kafka-client can mostly be a drop in replacement?

I want to avoid having spouts reset to the beginning of topics after deployment, due to this change.


- Nasron