samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com.INVALID>
Subject Re: Sharing heap memory in Samza
Date Mon, 27 Oct 2014 21:51:10 GMT
Hey Jordan,

Your question touches on a couple of things:

1. Shared objects between Samza tasks within one container.
2. Multi-threaded SamzaContainers.

For (1), there is some discussion on shared state here:

  https://issues.apache.org/jira/browse/SAMZA-402

The outcome of this ticket was that it's something we want, but aren't
implementing right now. The idea is to have a state shore that's shared
amongst all tasks in a container. The store would be immutable, and would
be restored on startup via a stream that had all required data.

An alternative to this is to just have a static variable that all tasks
use. This will allow all tasks within one container to use the object.
We've done this before, and it works reasonably well for immutable
objects, which you have.

For (2), we've actively tried to avoid adding threading to the
SamzaContainer. Having a single threaded container has worked out pretty
well for us, and greatly simplifies the mental model that people need to
have to use Samza. Our advice to people who ask about adding parallelism
is to tell them to add more containers.

That said, it is possible to run threads inside a StreamTask if you really
want to increase your parallelism. Again, I would advise against this. If
not implemented properly, doing so can lead to data loss. The proper way
to implement threading inside a StreamTask is to have an thread pool
execute, and give threads messages as process() is called. You must then
disable offset checkpointing by setting task.commit.ms to -1. Lastly, your
task must implement WindowableTask. In the window method, it must block on
all threads that are currently processing a message. When all threads have
finished processing, it's then safe to checkpoint offsets, and the window
method must call coordinator.commit().

We've written a task that does this as well, and it works, but you have to
know what you're doing to get it right.

So, I think the two state options are:

1. Wait for global state to be implemented (or implement it yourself :)).
This could take a while.
2. Use static objects to share state among StreamTasks in a given
SamzaContainer.

And for parallelism:

1. Increase partition/container count for your job.
2. Add threads to your StreamTasks.

Cheers,
Chris

On 10/27/14 12:52 PM, "Jordan Lewis" <jordan@knewton.com> wrote:

>Hi,
>
>My team is interested in trying out Samza to augment or replace our
>hand-rolled Kafka-based stream processing system. I have a question about
>sharing memory across task instances.
>
>Currently, our main stream processing application has some large,
>immutable
>objects that need to be loaded into JVM heap memory in order to process
>messages on any partition of certain topics. We use thread-based
>parallelism in our system, so that the Kafka consumer threads on each
>machine listening to these topics can use the same instance of these large
>heap objects. This is very convenient, as these objects are so large that
>storing multiple copies of them would be quite wasteful.
>
>To use Samza, it seems as though each JVM would have to store copies of
>these objects separately, even if we were to use LevelDB's off-heap
>storage
>- each JVM would eventually have to inflate the off-heap memory into
>regular Java objects to be usable. One solution to this problem could be
>using something like Google's Flatbuffers [0] for these large objects - so
>that we could use accessors on large, off-heap ByteBuffers without having
>to actually deserialize them. However, we think that doing this for all of
>the relevant data we have would be a lot of work.
>
>Have you guys considered implementing a thread-based parallelism model for
>Samza, whether as a replacement or alongside the current JVM-based
>parallelism approach? What obstacles are there to making this happen,
>assuming that decided not to do it? This approach would be invaluable for
>our use case, since we rely so heavily (perhaps unfortunately so) on these
>shared heap data structures.
>
>Thanks,
>Jordan Lewis
>
>[0]: http://google.github.io/flatbuffers/


Mime
View raw message