samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <>
Subject Thoughts and obesrvations on Samza
Date Wed, 01 Jul 2015 01:55:12 GMT
Hey all,

I have had some discussions with Samza engineers at LinkedIn and Confluent
and we came up with a few observations and would like to propose some

We've observed some things that I want to call out about Samza's design,
and I'd like to propose some changes.

* Samza is dependent upon a dynamic deployment system.
* Samza is too pluggable.
* Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs are
trying to solve a lot of the same problems.

All three of these issues are related, but I'll address them in order.


Samza strongly depends on the use of a dynamic deployment scheduler such as
YARN, Mesos, etc. When we initially built Samza, we bet that there would be
one or two winners in this area, and we could support them, and the rest
would go away. In reality, there are many variations. Furthermore, many
people still prefer to just start their processors like normal Java
processes, and use traditional deployment scripts such as Fabric, Chef,
Ansible, etc. Forcing a deployment system on users makes the Samza start-up
process really painful for first time users.

Dynamic deployment as a requirement was also a bit of a mis-fire because of
a fundamental misunderstanding between the nature of batch jobs and stream
processing jobs. Early on, we made conscious effort to favor the Hadoop
(Map/Reduce) way of doing things, since it worked and was well understood.
One thing that we missed was that batch jobs have a definite beginning, and
end, and stream processing jobs don't (usually). This leads to a much
simpler scheduling problem for stream processors. You basically just need
to find a place to start the processor, and start it. The way we run grids,
at LinkedIn, there's no concept of a cluster being "full". We always add
more machines. The problem with coupling Samza with a scheduler is that
Samza (as a framework) now has to handle deployment. This pulls in a bunch
of things such as configuration distribution (config stream), shell scrips
(bin/, JobRunner), packaging (all the .tgz stuff), etc.

Another reason for requiring dynamic deployment was to support data
locality. If you want to have locality, you need to put your processors
close to the data they're processing. Upon further investigation, though,
this feature is not that beneficial. There is some good discussion about
some problems with it on SAMZA-335. Again, we took the Map/Reduce path, but
there are some fundamental differences between HDFS and Kafka. HDFS has
blocks, while Kafka has partitions. This leads to less optimization
potential with stream processors on top of Kafka.

This feature is also used as a crutch. Samza doesn't have any built in
fault-tolerance logic. Instead, it depends on the dynamic deployment
scheduling system to handle restarts when a processor dies. This has made
it very difficult to write a standalone Samza container (SAMZA-516).


In some cases pluggability is good, but I think that we've gone too far
with it. Currently, Samza has:

* Pluggable config.
* Pluggable metrics.
* Pluggable deployment systems.
* Pluggable streaming systems (SystemConsumer, SystemProducer, etc).
* Pluggable serdes.
* Pluggable storage engines.
* Pluggable strategies for just about every component (MessageChooser,
SystemStreamPartitionGrouper, ConfigRewriter, etc).

There's probably more that I've forgotten, as well. Some of these are
useful, but some have proven not to be. This all comes at a cost:
complexity. This complexity is making it harder for our users to pick up
and use Samza out of the box. It also makes it difficult for Samza
developers to reason about what the characteristics of the container (since
the characteristics change depending on which plugins are use).

The issues with pluggability are most visible in the System APIs. What
Samza really requires to be functional is Kafka as its transport layer. But
we've conflated two unrelated use cases into one API:

1. Get data into/out of Kafka.
2. Process the data in Kafka.

The current System API supports both of these use cases. The problem is, we
actually want different features for each use case. By papering over these
two use cases, and providing a single API, we've introduced a ton of leaky

For example, what we'd really like in (2) is to have monotonically
increasing longs for offsets (like Kafka). This would be at odds with (1),
though, since different systems have different SCNs/Offsets/UUIDs/vectors.
There was discussion both on the mailing list and the SQL JIRAs about the
need for this.

The same thing holds true for replayability. Kafka allows us to rewind when
we have a failure. Many other systems don't. In some cases, systems return
null for their offsets (e.g. WikipediaSystemConsumer) because they have no

Partitioning is another example. Kafka supports partitioning, but many
systems don't. We model this by having a single partition for those
systems. Still, other systems model partitioning differently (e.g. Kinesis).

The SystemAdmin interface is also a mess. Creating streams in a
system-agnostic way is almost impossible. As is modeling metadata for the
system (replication factor, partitions, location, etc). The list goes on.

Duplicate work

At the time that we began writing Samza, Kafka's consumer and producer APIs
had a relatively weak feature set. On the consumer-side, you had two
options: use the high level consumer, or the simple consumer. The problem
with the high-level consumer was that it controlled your offsets, partition
assignments, and the order in which you received messages. The problem with
the simple consumer is that it's not simple. It's basic. You end up having
to handle a lot of really low-level stuff that you shouldn't. We spent a
lot of time to make Samza's KafkaSystemConsumer very robust. It also allows
us to support some cool features:

* Per-partition message ordering and prioritization.
* Tight control over partition assignment to support joins, global state
(if we want to implement it :)), etc.
* Tight control over offset checkpointing.

What we didn't realize at the time is that these features should actually
be in Kafka. A lot of Kafka consumers (not just Samza stream processors)
end up wanting to do things like joins and partition assignment. The Kafka
community has come to the same conclusion. They're adding a ton of upgrades
into their new Kafka consumer implementation. To a large extent, it's
duplicate work to what we've already done in Samza.

On top of this, Kafka ended up taking a very similar approach to Samza's
KafkaCheckpointManager implementation for handling offset checkpointing.
Like Samza, Kafka's new offset management feature stores offset checkpoints
in a topic, and allows you to fetch them from the broker.

A lot of this seems like a waste, since we could have shared the work if it
had been done in Kafka from the get-go.


All of this leads me to a rather radical proposal. Samza is relatively
stable at this point. I'd venture to say that we're near a 1.0 release. I'd
like to propose that we take what we've learned, and begin thinking about
Samza beyond 1.0. What would we change if we were starting from scratch? My
proposal is to:

1. Make Samza standalone the *only* way to run Samza processors, and
eliminate all direct dependences on YARN, Mesos, etc.
2. Make a definitive call to support only Kafka as the stream processing
3. Eliminate Samza's metrics, logging, serialization, and config systems,
and simply use Kafka's instead.

This would fix all of the issues that I outlined above. It should also
shrink the Samza code base pretty dramatically. Supporting only a
standalone container will allow Samza to be executed on YARN (using
Slider), Mesos (using Marathon/Aurora), or most other in-house deployment
systems. This should make life a lot easier for new users. Imagine having
the hello-samza tutorial without YARN. The drop in mailing list traffic
will be pretty dramatic.

Coupling with Kafka seems long overdue to me. The reality is, everyone that
I'm aware of is using Samza with Kafka. We basically require it already in
order for most features to work. Those that are using other systems are
generally using it for ingest into Kafka (1), and then they do the
processing on top. There is already discussion (
in Kafka to make ingesting into Kafka extremely easy.

Once we make the call to couple with Kafka, we can leverage a ton of their
ecosystem. We no longer have to maintain our own config, metrics, etc. We
can all share the same libraries, and make them better. This will also
allow us to share the consumer/producer APIs, and will let us leverage
their offset management and partition management, rather than having our
own. All of the coordinator stream code would go away, as would most of the
YARN AppMaster code. We'd probably have to push some partition management
features into the Kafka broker, but they're already moving in that
direction with the new consumer API. The features we have for partition
assignment aren't unique to Samza, and seem like they should be in Kafka
anyway. There will always be some niche usages which will require extra
care and hence full control over partition assignments much like the Kafka
low level consumer api. These would continue to be supported.

These items will be good for the Samza community. They'll make Samza easier
to use, and make it easier for developers to add new features.

Obviously this is a fairly large (and somewhat backwards incompatible
change). If we choose to go this route, it's important that we openly
communicate how we're going to provide a migration path from the existing
APIs to the new ones (if we make incompatible changes). I think at a
minimum, we'd probably need to provide a wrapper to allow existing
StreamTask implementations to continue running on the new container. It's
also important that we openly communicate about timing, and stages of the

If you made it this far, I'm sure you have opinions. :) Please send your
thoughts and feedback.


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message