From user-return-11037-apmail-storm-user-archive=storm.apache.org@storm.apache.org Tue May 31 12:53:37 2016 Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95DA519FBB for ; Tue, 31 May 2016 12:53:37 +0000 (UTC) Received: (qmail 10224 invoked by uid 500); 31 May 2016 12:53:36 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 10173 invoked by uid 500); 31 May 2016 12:53:36 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 10163 invoked by uid 99); 31 May 2016 12:53:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 May 2016 12:53:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B1A471A09AB for ; Tue, 31 May 2016 12:53:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ESKmk0-vAx3h for ; Tue, 31 May 2016 12:53:32 +0000 (UTC) Received: from mail-qk0-f174.google.com (mail-qk0-f174.google.com [209.85.220.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 7934E5F2C2 for ; Tue, 31 May 2016 12:53:31 +0000 (UTC) Received: by mail-qk0-f174.google.com with SMTP id n63so142181875qkf.0 for ; Tue, 31 May 2016 05:53:31 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=hwLAmqDzsQe56PmeV+8c6dAq9bL5IyHM/TSCdgTS+IE=; b=C/z3BjX5/Uds76XV1sgc6a9CUdL1OOPSpN4GyODlsrrW7QXEKKrtU7v39EmqL7lNU5 PScIWxnNfNEtnZWgPJbM5rZKO4hpIY+Ya745tJ4vQUqosbnQpAex/yTra486agrzTxPA +nzMpl3FO5wywmUlhw/HeotAaaTVtxytM2+37dKJdiKsc2r58EzHW75YaoQx/DOF5GLA hj4zX/MMrxqZg9Hq1NtoqmFpYBUrd8JhEQGWoFp0Y/hT9grWkKdidlQAiMd0nGKzQGZ9 +/UpaN4hNmsH2ucaSawIBzVGd3I5TztGHwqfYBrgCyNa/c++ir8G/FvGjxy9VlVWdFHy T0nw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=hwLAmqDzsQe56PmeV+8c6dAq9bL5IyHM/TSCdgTS+IE=; b=f2ioTudvszLB2yvrCmBBHiyf+1B3euMOkjOe7o15Ulgt88achzGGHHPhvRVxfNyzgb N92cED3nulRXXRzy8siQVhKDp+C+5ywQ847IBlNrld59ivzPWPnFRrzMi9xM/fZLlBbP XEQy13BTfJe4QzoDLamjmmA/pIRbsTkvboz8l1LuXxKJkHctq11APDYm4My9caWSZwFF OjQDKBKYqFi3dLOT7khVPCS7wChwzFi8b5fcJ5Fj4N6FkMQdj+0ZrGShfZfJ2Q3guuVc /aFANz3XT91kthLbRaj3qdsegpFxZfV/4kwPXNRhxJ1LNMguZ8i7mvkJWoyUDndBrZs1 pa1A== X-Gm-Message-State: ALyK8tKQ4sZnp8E/F/0GyebCte3AJ6cl7WlyF0XTGoRjkwO3ZkWHtAph8pThYP6WIH/7j4S3yy+E82v0+V3vYw== MIME-Version: 1.0 X-Received: by 10.55.182.69 with SMTP id g66mr32380331qkf.186.1464699204548; Tue, 31 May 2016 05:53:24 -0700 (PDT) Received: by 10.55.90.132 with HTTP; Tue, 31 May 2016 05:53:24 -0700 (PDT) In-Reply-To: <9D24A94E-DC51-4C84-86C7-F634B4CB9663@gf.com.cn> References: <9D24A94E-DC51-4C84-86C7-F634B4CB9663@gf.com.cn> Date: Tue, 31 May 2016 08:53:24 -0400 Message-ID: Subject: Re: How to improve the intercommunication latency of spout/bolt From: Kevin Conaway To: "user@storm.apache.org" Content-Type: multipart/alternative; boundary=94eb2c062270ca51a7053422dbcc --94eb2c062270ca51a7053422dbcc Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Try using localOrShuffle grouping. Storm will attempt to pass messages directly to the next component within the same JVM, if possible On Tuesday, May 31, 2016, =E6=9E=97=E6=B5=B7=E6=B6=9B=EF=BC=88=E4=BF=A1=E6= =81=AF=E6=8A=80=E6=9C=AF=E9=83=A8=E4=BA=A4=E6=98=93=E4=BA=91=E6=8A=80=E6=9C= =AF=E7=A0=94=E5=8F=91=E7=BB=84=EF=BC=89 wrote: > Hello. > I do test with a simple topology to test the intercommunication latency o= f > spout/bolt. It=E2=80=99s just emit the current nano timestamp from a spou= t and > print the time difference when a bolt receive it. > I deploy my storm cluster in my own machine with docker container (one > nimbus, one supervisor), and run the topology in cluster mode. > code as below: > > public class RandomSpout extends BaseRichSpout{ > > SpoutOutputCollector _collector; > > > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > > _collector =3D collector; > > } > > > > public void nextTuple() { > > Utils.sleep(1000); > > long currentTime =3D System.nanoTime(); > > _collector.emit(new Values(currentTime)); > > > > } > > > public void declareOutputFields(OutputFieldsDeclarer arg0) { > > // TODO Auto-generated method stub > > arg0.declare(new Fields("value")); > > } > > } > > > public class PrintBolt extends BaseRichBolt{ > > private LogFileWriter _logFile; > > > public void execute(Tuple arg0) { > > // TODO Auto-generated method stub > > long prevTime =3D arg0.getLong(0); > > long currentTime =3D System.nanoTime(); > > _logFile.writeLog("cost: " + (currentTime - prevTime)); > > } > > > public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) > { > > // TODO Auto-generated method stub > > try { > > _logFile =3D new LogFileWriter("StormTest=E2=80=9D, this > .getClass().getSimpleName()); > > } catch (Exception e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > } > > > public void declareOutputFields(OutputFieldsDeclarer arg0) { > > // TODO Auto-generated method stub > > arg0.declare(new Fields("value")); > > } > > > } > > > public class Topology > > { > > public static void main( String[] args ) > > { > > TopologyBuilder builder =3D new TopologyBuilder(); > > builder.setSpout("spout", new RandomSpout(), 1); > > builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout"); > > > > Config conf =3D new Config(); > > conf.setDebug(false); > > if(args.length > 0){ > > // cluster submit. > > conf.setNumWorkers(2); > > conf.setNumAckers(0); > > try { > > StormSubmitter.submitTopology("stormTest", conf, builder > .createTopology()); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > }else{ > > new LocalCluster().submitTopology("stormTest", conf, builder > .createTopology()); > > } > > } > > } > > > Output is below: > > [2016-05-31 09:13:53]cost: 1960336 > [2016-05-31 09:13:54]cost: 2600239 > [2016-05-31 09:13:55]cost: 3103449 > [2016-05-31 09:13:56]cost: 3206544 > [2016-05-31 09:13:57]cost: 3783647 > [2016-05-31 09:13:58]cost: 3635923 > [2016-05-31 09:13:59]cost: 3887787 > [2016-05-31 09:14:00]cost: 1623692 > [2016-05-31 09:14:01]cost: 2524674 > [2016-05-31 09:14:02]cost: 3383506 > [2016-05-31 09:14:03]cost: 3898478 > [2016-05-31 09:14:04]cost: 2120949 > [2016-05-31 09:14:05]cost: 3756272 > [2016-05-31 09:14:06]cost: 2877997 > [2016-05-31 09:14:07]cost: 3432532 > [2016-05-31 09:14:08]cost: 3638306 > [2016-05-31 09:14:09]cost: 2958907 > [2016-05-31 09:14:10]cost: 2742666 > [2016-05-31 09:14:11]cost: 3024576 > [2016-05-31 09:14:12]cost: 2822562 > [2016-05-31 09:14:13]cost: 2623060 > [2016-05-31 09:14:14]cost: 4045938 > > Obviously, there is a 2ms latency approximately. It seems not good for me= . > How can I reduce the latency? > --=20 Kevin Conaway http://www.linkedin.com/pub/kevin-conaway/7/107/580/ https://github.com/kevinconaway --94eb2c062270ca51a7053422dbcc Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Try using localOrShuffle grouping.=C2=A0 Storm will attempt to pass message= s directly to the next component within the same JVM, if possible

On Tuesday, May 31, 2016, =E6=9E=97=E6=B5=B7=E6=B6=9B=EF=BC=88= =E4=BF=A1=E6=81=AF=E6=8A=80=E6=9C=AF=E9=83=A8=E4=BA=A4=E6=98=93=E4=BA=91=E6= =8A=80=E6=9C=AF=E7=A0=94=E5=8F=91=E7=BB=84=EF=BC=89 <linhaitao@gf.com.cn> wrote:
Hello.
I do test with a simple topology to test the=C2=A0intercommunication latenc= y of spout/bolt. It=E2=80=99s just emit the current nano timestamp from a s= pout and print the time difference when a bolt receive it.=C2=A0
I deploy my storm cluster in my own machine with docker container (one nimb= us, one supervisor), and run the topology in cluster mode.
code as below:

public c= lass RandomSpout extends BaseRichSpout{

SpoutOutputCollector _collector;


=C2=A0 public void open(Map conf, TopologyContext context, SpoutOutputCollector collecto= r) {

= =C2=A0 =C2=A0 _collector =3D collector;

=C2=A0 }

=C2=A0

=C2=A0 public void nextTuple() {

=C2=A0 =C2=A0 Utils.sleep(1000)= ;

l= ong currentTime =3D System.nanoTime();

=C2=A0 =C2=A0 _collector.emit(new Values(currentTime));

=C2=A0 =C2=A0

=C2=A0 }


p= ublic void declareOutputFields(OutputFieldsD= eclarer arg0) {

<= /span>// TODO Auto-generated method stub

a= rg0.declare(new Fields("value"));

}

}


public c= lass PrintBolt extends BaseRichBolt{

p= rivate LogFileWriter _logFile;


p= ublic void execute(Tuple arg0) {

<= /span>// TODO Auto-generated method stub

l= ong prevTime =3D arg0.getLong(0);

l= ong currentTime =3D System.nanoTime();

_= logFile.writeLog("cost: " + (currentTime - prevTime));

}


p= ublic void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

<= /span>// TODO Auto-generated method stub

t= ry {

_logFile<= font face=3D"Monaco" size=3D"2"> =3D new LogFileWriter(<= span style=3D"color:rgb(57,51,255);font-family:Monaco;font-size:11px">"= ;StormTest=E2=80= =9D,=C2=A0this.getClass().getSimpleName());

} catch (Exception e) {

<= /span>// TODO Auto-generated catch block

e= .printStackTrace();

}

}


p= ublic void declareOutputFields(OutputFieldsD= eclarer arg0) {

<= /span>// TODO Auto-generated method stub

a= rg0.declare(new Fields("value"));

}


}


public c= lass Topology=C2=A0

{

=C2=A0 =C2=A0 public static void main( String[] args )

=C2=A0 =C2=A0 {

=C2=A0 =C2=A0 TopologyBuilder <= span style=3D"color:#7e504f"> builder =3D new TopologyBuilder= ();

=C2=A0 =C2=A0 builder.setSpout("sp= out", new RandomSpout(), 1);

=C2=A0 =C2=A0 builder.setBolt("bol= t", new PrintBolt(), 1).shuffleGrouping("spout");

=C2=A0=C2=A0 =C2=A0

=C2=A0 =C2=A0 Config conf =3D new Config();

=C2=A0 =C2=A0 conf.setDebug(false);

=C2=A0 =C2=A0 if(args.length > 0){

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // cluster submit.

=C2=A0 =C2=A0 conf.setNumWorkers(2);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 conf.setNumAckers(0);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try= {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 StormSubmitter.subm= itTopology("stormTest", conf, bu= ilder.createTopology());

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } c= atch (Exception e) {

e= .printStackTrace();

=C2=A0 =C2=A0=C2=A0}

=C2=A0 =C2=A0 =C2=A0 =C2=A0 }else{

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 new= LocalCluster().submitTopology("s= tormTest", conf, bu= ilder.createTopology());

=C2=A0 =C2=A0 =C2=A0 =C2=A0 }

=C2=A0 =C2=A0 }

}


Output is below:

[2016-05-31 09:13:53]cost: 19=
60336
[2016-05-31 09:13:=
54]cost: 2600239
[2016-05-31 09:13:=
55]cost: 3103449
[2016-05-31 09:13:=
56]cost: 3206544
[2016-05-31 09:13:=
57]cost: 3783647
[2016-05-31 09:13:=
58]cost: 3635923
[2016-05-31 09:13:=
59]cost: 3887787
[2016-05-31 09:14:=
00]cost: 1623692
[2016-05-31 09:14:=
01]cost: 2524674
[2016-05-31 09:14:=
02]cost: 3383506
[2016-05-31 09:14:=
03]cost: 3898478
[2016-05-31 09:14:=
04]cost: 2120949
[2016-05-31 09:14:=
05]cost: 3756272
[2016-05-31 09:14:=
06]cost: 2877997
[2016-05-31 09:14:=
07]cost: 3432532
[2016-05-31 09:14:=
08]cost: 3638306
[2016-05-31 09:14:=
09]cost: 2958907
[2016-05-31 09:14:=
10]cost: 2742666
[2016-05-31 09:14:=
11]cost: 3024576
[2016-05-31 09:14:=
12]cost: 2822562
[2016-05-31 09:14:=
13]cost: 2623060
[2016-05-31 09:14:=
14]cost: 4045938

Obviously, there is a 2ms latency approximately. It seems not good for me. = How can I reduce the latency?



--

--94eb2c062270ca51a7053422dbcc--