samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thunder Stumpges <>
Subject Re: 0.14 to 1.x Low-Level application migration questions
Date Wed, 05 Jun 2019 18:20:46 GMT
Thanks, I think this all makes sense. I was able to get my application up
and running with the exception of the Output SystemStream not finding its

For a little more background, here's how we are set up:

We are moving from YARN to standalone (running in Kubernetes). I think we
have this part under control.

Previously, we had a config rewriter that would iterate over all of the
topics in the config "*task.inputs*" and "*task.outputs.**" config settings
and create serializers for each stream.
One issue we ran into was that the descriptor setup in code doesn't like
our old StreamId(s) that have dots in the name, so we went through some
work to use logical names for our stream IDs, and set the topic name using
the "withPhysicalName" method. We figured that with the outputs all being
wired up in code, neither of these should matter, and we expected the Serde
we specified when registering the OutputDescriptor to be found and used
when sending messages using that logical StreamId.

However what we see while debugging is that when sending a message, the
logical StreamId we registered with the OutputDescriptor cannot be found in
SystemProducer.send() which calls serdeManager.toBytes(...) and in there,
it cannot locate the serde using the logical StreamId. What is strange is
that there ARE Serde(s) in the "systemStreamMessageSerdes" map set up for
StreamId(s) that use the physical names of our topics (including the dots)
but NOT the logical names.

We seem to be stuck a little right now, and unsure why this new restriction
on the use of dot in StreamIDs from the Descriptor side, but not seemingly
aligning in the rest of the system.

Any ideas or support would be great. We'll continue to dig as well.

On Mon, Jun 3, 2019 at 10:57 AM Prateek Maheshwari <>

> Hi Thunder,
> I'm assuming you're talking about the low level (StreamTask) API here,
> since the High Level API has stronger requirements for I/O
> systems/streams.
> > How much IS picked up from config.
> All of the system, stream and store properties can still be specified
> in configuration. Properties specified in config will override those
> specified using descriptors (with a couple of exceptions like
> task.inputs).
> >  I don't see how to register [custom coordinator] system via the
> ApplicationDescriptor
> Re: dedicated coordinator system, you can continue to specify
> 'job.coordinator.system' and it's properties in configs. To keep the
> API simple, we only support specifying the job.default.system (which
> is the default system for intermediate, coordinator, changelog and
> checkpoint streams) using descriptors for now.
> > It seems that a KafkaSystem is only associated with the
> ApplicationDescriptor via its Input/Output/Table descriptors.
> Yeah, the ApplicationDescriptor is only aware of system descriptors
> transitively through the input / output streams or the default system.
> However see the response above for adding systems via configs.
> > we have dynamic output SystemStream(s) created based on other runtime
> state
> This will still work in Low Level API. It is recommended to, but
> there's no hard requirement to pre-specify your output systems and
> streams.
> In general, when migrating your Low Level TaskApplication to Samza
> 1.0, you should be able to do
> 'applicationDescriptor.withTaskFactory(() -> new MyTask)' in your
> TaskApplication#describe with no other code changes. Please give that
> a shot and let us know if you run into any issues.
> Apologies for the confusion, we'll update the upgrade docs.
> Thanks,
> Prateek
> On Sat, Jun 1, 2019 at 11:13 AM Thunder Stumpges
> <> wrote:
> >
> > Hey guys,
> >
> > I'm following the guide here:
> >
> >
> > In step 3 it says:
> > "In Samza 1.0, a Samza application’s input, output, and processing-task
> > should be specified in code, rather than in config. "
> >
> > How much IS picked up from config? Will all the configuration of the
> > systems (consumer and producer properties, buffering, etc) be picked up
> > from the config properties still? What about stream settings like offset
> > reset, offset default, etc?
> >
> > In some of my tasks, I have a dedicated coordinator system. I don't see
> how
> > to register that system via the ApplicationDescriptor, nor how to
> associate
> > it with the coordinator (config setting `*job.coordinator.system*`). It
> > seems that a KafkaSystem is only associated with the
> ApplicationDescriptor
> > via its Input/Output/Table descriptors. Is this correct?
> >
> > I would like to keep my config in config, not in code, but it feels like
> > this is forcing me to move some (or all?) of it into code. I had custom
> > config re-writers which made this very flexible, but I'm not seeing how
> to
> > adapt this to the "new way". The Application/ApplicationDescriptor seems
> to
> > have no connection to the Configuration / properties...
> >
> > One other thing, is that in a few of my jobs, we have dynamic output
> > SystemStream(s) created based on other runtime state. Is this not going
> to
> > be possible anymore?
> >
> > A little more guidance would be most helpful.
> >
> > Thanks!
> > Thunder Stumpges

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message