samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Samza job throughput much lower than Kafka throughput
Date Thu, 21 May 2015 04:30:15 GMT
Hi George,

Is there any reason you need to set the following configs?

systems.kafka.consumer.fetch.wait.max.ms= 1

This setting will basically disable long pooling of the consumer which will
then busy fetching data from broker, which has a large impact on network
latency especially when the consumer is already caught up with the Kafka
broker.

Also when you say it is "slower than a program reading directly from
Kafka." which consumer did your program use to read data from Kafka?

Guozhang


On Wed, May 20, 2015 at 5:01 PM, George Li <gli@ca.ibm.com> wrote:

> Hi Yi,
>
> Thanks for the reply. Below is my job config and code.
>
> When we run this job inside our dev docker container, which has zookeeper,
> broker, and yarn installed locally,  its throughput is at least 50% higher
> than our cluster run's.
>
> Thanks,
>
> George
>
> Configuration:
>
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=container-performance
>
> # YARN
> yarn.container.count=1
> yarn.container.memory.mb=2548
> yarn.package.path={my package on hdfs}
> yarn.container.retry.count=0
> yarn.am.container.memory.mb=2048
> yarn.am.jmx.enabled=false
>
> # Task
> task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> -XX:+DisableExplicitGC -Djava.awt.headless=true
>
> task.class=samza.TestPerformanceTask
> task.inputs=kafka.throughput-test2
> task.log.interval=1000000
> task.checkpoint.factory =
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
> task.checkpoint.replication.factor=1
>
> # Kafka System (only used for coordinator stream in this test)
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.fetch.threshold=50000
>
> systems.kafka.consumer.zookeeper.connect= {zookeeper}
> systems.kafka.producer.bootstrap.servers={broker node}
> systems.kafka.consumer.auto.offset.reset=smallest
> systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
> systems.kafka.consumer.fetch.message.max.bytes= 1100000
> systems.kafka.consumer.fetch.min.bytes= 1
> systems.kafka.consumer.fetch.wait.max.ms= 1
>
> #define coordinator system
> job.coordinator.system=kafka
> job.coordinator.replication.factor=1
>
> systems.kafka.streams.throughput-test2.samza.reset.offset=true
> systems.kafka.streams.throughput-test2.samza.offset.default=oldest
> ~
>
> Job's code. This is mostly a copy-paste of the one in the repository
>
> object TestPerformanceTask {
>   // No thread safety is needed for these variables because they're
> mutated in
>   //   // the process method, which is single threaded.
>   var messagesProcessed = 0
>   var startTime = 0L
> }
>
> class TestPerformanceTask extends StreamTask with InitableTask with
> Logging {
>   import TestPerformanceTask._
>
>   /**
>    *    * How many messages to process before a log message is printed.
>    *       */
>   var logInterval = 10000
>
>   /**
>    *    * How many messages to process before shutting down.
>    *       */
>   var maxMessages = 10000000
>
>
>   var outputSystemStream: Option[SystemStream] = None
>
>   def init(config: Config, context: TaskContext) {
>     logInterval = config.getInt("task.log.interval", 10000)
>     maxMessages = config.getInt("task.max.messages", 10000000)
>     outputSystemStream = Option(config.get("task.outputs",
> null)).map(Util.getSystemStreamFromNames(_))
>     println("init!!")
>   }
>
>   def process(envelope: IncomingMessageEnvelope, collector:
> MessageCollector, coordinator: TaskCoordinator) {
>     if (startTime == 0) {
>       startTime = System.currentTimeMillis
>     }
>
>     if (outputSystemStream.isDefined) {
>       collector.send(new OutgoingMessageEnvelope(outputSystemStream.get,
> envelope.getKey, envelope.getMessage))
>     }
>
>     messagesProcessed += 1
>
>     if (messagesProcessed % logInterval == 0) {
>       val seconds = (System.currentTimeMillis - startTime) / 1000
>       println("Processed %s messages in %s seconds." format
> (messagesProcessed, seconds))
>     }
>
>
>     if (messagesProcessed >= maxMessages) {
>       coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
>     }
>   }
> }
>
>
>
> From:   Yi Pan <nickpan47@gmail.com>
> To:     dev@samza.apache.org,
> Date:   20/05/2015 05:03 PM
> Subject:        Re: Samza job throughput much lower than Kafka throughput
>
>
>
> Hi, George,
>
> Could you share w/ us the code and configuration of your sample test job?
> Thanks!
>
> -Yi
>
> On Wed, May 20, 2015 at 1:19 PM, George Li <gli@ca.ibm.com> wrote:
>
> > Hi,
> >
> > We are evaluating Samza's performance, and our sample job with
> > TestPerformanceTask is much slower than a program reading directly from
> > Kafka.
> >
> > Scenario:
> >
> > * Cluster:
> > 1 master node for Zookeeper and yarn.
> > 3 Kafka broker nodes
> > 3 yarn worker nodes
> >
> > * Kafka:
> > Topic has only 1 partition. Average message size is around 100 byte.
> > On a yarn worker node, run the performance test program from Kafka
> > repository to read the topic. The throughput is about 400k messages/sec
> >
> > *Samza
> > Run TestPerformanceTask from Samza repository with no output stream
> > defined, and the throughput no more than 130k messages/sec
> >
> >
> > How can I explain/fix this performance difference?
> >
> > What I have done so far:
> >
> > 1. Monitor yarn worker node resource usage.
> > When the job is running, cpu and memory usage are never more than 5%
> > except at the beginning of the run. No significant network and disk IO
> > either
> >
> > 2. Monitor worker node network traffic
> > Tcpdump shows an interesting pattern. The yarn worker node will fetch a
> > block of data from the kafka broker, and after that, it will handshake
> > with the same kafka broker once every 100 ms for 300 ms before fetching
> > the next block.
> >
> > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x the
> > default settings, this handshake lasts about 3 seconds. If I set
> > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It seems
> > kafka consumer greatly outpaced process() call
> >
> > 3. Check Samza metrics
> > I do not see any excessive network calls to master or kafka broker,
> i.e.,
> > no more than 30 calls/sec. However, process-ms and choose-ms are
> > approaching 2ms
> >
> > Any input would be greatly appreciated.
> >
> > Thanks,
> >
> > George
> >
> >
> >
> >
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message