samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <>
Subject Re: Production deployement
Date Thu, 14 Jan 2016 06:18:24 GMT
Hi, Alex,

I apologize for the late reply. Let me try to give some feedbacks/comments

On Thu, Jan 7, 2016 at 3:59 PM, Alexander Filipchik <>

> 1) What is the best way to handle partial outages. Let's say my yarn
> cluster is deployed on amazon among 3 availability zones. Is there a way to
> guarantee operability if I lose whole availability zone (one third of
> capacity)? Will samza just restart failed containers on available nodes
> (which means some downtime) or there is a way to have a passive task
> instances that can take over? What will happen if master dies?

When a node dies in YARN, there are the following situations:
a. the RM dies. W/o RM HA, the whole cluster will be unavailable in this
case and has to rely on ops to restart the whole YARN cluster
b. one NM dies. In this case, there could be two sub-cases: b.1 the NM only
runs SamzaContainer; b.2 the NM runs SamzaAppMaster. In b.1, SamzaAppMaster
will re-request a new container from RM and start the SamzaContainer in the
new container. In b.2, the whole Samza job will fail and YARN RM will
re-start the job again. As for now, there is no "passive" task instances
that are standby.

> 2) What is the best way of deploying new code? I'm especially interested in
> how to deploy new tasks that maintain pretty big state without interrupting
> streaming?

Since the configuration of Samza job is still immutable, right now the way
to deploy new Samza code is still to re-push the binary and restart the
job. It used to take long time if your job has big states. With
host-affinity feature in Samza 0.10, the restarted Samza job will try the
best to use the previous hosts to run the same containers and re-use the
local state stores. In LinkedIn, we have test this feature with big
stateful jobs and successfully cut-down the re-bootstrap time.

> 3) What is the good naming and versioning strategy for things like kafka
> topics, RocksDB stores, etc

Samza does not restrict the naming mechanism application chooses for Kafka
topics and RocksDB stores. What makes sense and can uniquely identify state
stores and application input/output streams in the deployment environment
would be good enough.

> 4) What is the best way of configuring jobs? Hello samza example bundles
> configs with the tasks so all the urls are hidden inside a tar file. Is
> there a better way to pass properties based on region, environment (prod,
> test, dev), etc?

Based on different deployment system, the way to pass on the configuration
can be very different. The property file based configuration is just one
simple example we used in hello-samza example. It may not make sense to
complex deployment environment. In LinkedIn, we package the binary and
configuration in two different packages and LinkedIn's deployment system
can identify the bundle of binary and configuration separately and deploy
them to the target host's specific locations. Then, the start script will
use the specific configuration location determined by the deployment system
as the path to configuration when starting the Samza job.

> 5) I faced a weird problem with kafka partitioning. I created 2 kafka queus
> and 2 samza jobs that were communicating like:
> topic1 -> samza1 -> topic2 -> samza2
> samza2 had a state in rockDB (let's say itwas just storing strings it saw).
> Kafka topics had 20 partitions. I found that messages that were send by
> samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer
> were landing on different samza2 instancies even though they had same
> partition key (of type string).
> Example:
> samza1 sending mesage with key "key1"  to samza2 via topic2, and it is
> stored in task1 of samza2
> I send messages manually to topic2 with key "key1" and it is stored in task
> 10 of samza2. Code that I was usign to send messages from samza1:
> Config:
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.key.serde=string
> systems.kafka.samza.msg.serde=string
> Code:
> private static final SystemStream OUTPUT_STREAM =
>         new SystemStream("kafka", "topic2");
> messageCollector.send(new OutgoingMessageEnvelope(
>         OUTPUT_STREAM, "key1", message));
> manually:
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092")
> configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
> configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
> val kafkaProducer = new KafkaProducer[String, String](configs)
> val record = new ProducerRecord[String, String]("topic2", "key1", message)
> kafkaProducer.send(record).get()
> What can be wrong?
This might be related w/ SAMZA-839.

> and one crazy question:
> have anyone thought about combining samza and spark? Like allow spark to
> use Samza's RocksDB/LevelDB storage as a state holder for micro batching?
I think that a better question would be: can we implement micro-match (i.e.
windowing) in Samza and provides RDDs to allow Spark Streaming programs to
run on top of Samza? That's interesting thought, which allows unified
programming model in both online and offline world. However, using
micro-batch as in Spark Streaming APIs also introduce issues as disruptive
session windows, out-of-order arrivals across boundaries e.t.c. We
certainly can pound on it more.

> Thank you,
> Alex

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message