activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1948 dotnet example with high performant load
Date Fri, 22 Jun 2018 14:36:41 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3534b23e3 -> faf99cd68


ARTEMIS-1948 dotnet example with high performant load


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d805074b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d805074b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d805074b

Branch: refs/heads/master
Commit: d805074b94ce58e83c0f8950a525e44050dfa280
Parents: 3534b23
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jun 20 18:40:11 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Jun 21 11:23:58 2018 -0400

----------------------------------------------------------------------
 .../amqp/dotnet/HighPerformanceLoad/App.cs      |  73 ++++++++++++
 .../HighPerformanceLoad/HighPerformance.csproj  |  29 +++++
 .../amqp/dotnet/HighPerformanceLoad/Producer.cs |  96 ++++++++++++++++
 .../dotnet/HighPerformanceLoad/ReceiverPool.cs  | 110 +++++++++++++++++++
 .../HighPerformanceLoad/TokenBucketLimiter.cs   |  81 ++++++++++++++
 .../amqp/dotnet/HighPerformanceLoad/readme.md   |  62 +++++++++++
 .../dotnet/HighPerformanceLoad/start-server.sh  |  29 +++++
 7 files changed, 480 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs
new file mode 100644
index 0000000..c1a94b1
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Amqp;
+using System.Threading.Tasks;
+
+namespace Artemis.Perf
+{
+   class App
+    {
+        static long ReceivedMessages = 0;
+
+        static void TheCallback(int id, Session session, ReceiverLink link, Message message)
+        {
+            Interlocked.Increment(ref ReceivedMessages);
+            link.Accept(message);
+        }
+
+        static void Main(string[] args) {
+            
+            if (args.Length == 0) {
+                args = new string[1];
+                args[0] = "amqp://127.0.0.1:5672";
+            }
+
+            // it will start one client towards each server
+            for (int i = 0; i < args.Length; i++) {
+                string addr0  = args.Length >= 1 ? args[0] : "amqp://127.0.0.1:5672";
+                processOn(addr0, "orders", 100000000, 25000, "p1");
+            }
+
+            while (true) {
+                long previousRead = Interlocked.Read(ref ReceivedMessages);
+                long previousSent = Interlocked.Read(ref Producer.totalSent);
+                Thread.Sleep(1000);
+                long currentRead = Interlocked.Read(ref ReceivedMessages);
+                long currentSent = Interlocked.Read(ref Producer.totalSent);
+                Console.WriteLine("Received: " + currentRead + " TotalSent: " +
+                      currentSent);
+                Console.WriteLine("Rate reading: " + (currentRead - previousRead) + ", Rate
sending: " + (currentSent - previousSent));
+            }
+
+        }
+        static void processOn(string addr, string queue, int totalSend, int maxRateSend,
String processName) {
+            Address address = new Address(addr);
+
+            Connection connection = new Connection(address);
+
+            ReceiverPool pool = new ReceiverPool(connection, 1, queue, 200, TheCallback);
+            pool.start();
+
+            Producer Producer = new Producer(processName, addr, queue, totalSend, maxRateSend);
+            Producer.produce(); // this will start an asynchronous producer
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj b/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj
new file mode 100644
index 0000000..99a2d1a
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj
@@ -0,0 +1,29 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>netcoreapp2.0</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="AMQPNetLite" Version="2.1.3" />
+  </ItemGroup>
+
+</Project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs
new file mode 100644
index 0000000..8e9cefe
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Amqp;
+using System.Threading.Tasks;
+
+namespace Artemis.Perf
+{
+   public class Producer
+   {
+       string name;
+
+        string addr;
+        string queue;
+        int numberOfMessages;
+        int messagesPerSecond;
+        long messagesSent;
+
+        public static long totalSent;
+
+        public Producer(string name, string addr, string queue, int numberOfMessages, int
messagesPerSecond) {
+            this.name = name;
+            this.addr = addr;
+            this.queue = queue;
+            this.numberOfMessages = numberOfMessages;
+            this.messagesPerSecond = messagesPerSecond;
+        }
+
+
+        public void produce() {
+            Address address = new Address(addr);
+
+            Connection connection = new Connection(address);
+
+
+            Session session = new Session(connection);
+            SenderLink sender = new SenderLink(session, "sender", queue);
+
+            OutcomeCallback callback = (l, msg, o, s) => { 
+                Interlocked.Increment(ref messagesSent);
+                Interlocked.Increment(ref totalSent);
+            };
+
+            // This is just to limit the number of messages per second we are sending
+            TokenBucketLimiterImpl tokens = new TokenBucketLimiterImpl(messagesPerSecond);
+
+            Task.Factory.StartNew(() =>  {
+                Console.WriteLine("Sending {0} messages...", numberOfMessages);
+                for (var i = 0; i < numberOfMessages; i++)
+                {
+                    tokens.limit();
+                    Message message = new Message("a message!" + i);
+                    message.Header = new Header();
+                    message.Header.Durable = true;
+
+                    // The callback here is to make the sending to happen as fast as possible
+                    sender.Send(message, callback, null);
+                }
+                Console.WriteLine(".... Done sending");
+            }, TaskCreationOptions.LongRunning);
+
+            // Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error |
+            // TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning;
+            // Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]")
+ " " + string.Format(f, o));
+
+            // sender.Close();
+
+            // Task.Factory.StartNew(() =>  {
+            //     while (true) {
+            //         Console.WriteLine("Sent " + Interlocked.Read(ref messagesSent) + "
on queue " + queue + " producer " + this.name);
+            //         Thread.Sleep(1000);
+            //     }
+            // }, TaskCreationOptions.LongRunning);
+
+
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs
new file mode 100644
index 0000000..dce841a
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Amqp;
+using System.Threading.Tasks;
+
+namespace Artemis.Perf
+{
+     /**
+     * This class will start many consumers underneath it to satisfy a pool of consumers
+     * While calling a single callback for when messages are received.
+     */
+     public class ReceiverPool
+     {
+        public delegate void MessageReceived(int id, Session session, ReceiverLink link,
Message msg);
+
+        public int MessagesReceived;
+
+        MessageReceived _callback;
+        int _Workers;
+        private Object receiverLock = new Object();
+        private Boolean running = true;
+        
+        private ReceiverLink[] Receivers;
+        private Session[] Sessions;
+
+        private Connection _Connection;
+
+        private int Credits;
+
+        public ReceiverPool(Connection Connection, int Workers, String queue, int Credits,
MessageReceived callback)
+        {
+            this._Connection = Connection;
+            this.Receivers = new ReceiverLink[Workers];
+            this.Sessions = new Session[Workers];
+            
+
+            for (int i = 0; i < Workers; i++)
+            {
+
+                // I was playing with using a single session versus multiple sessions
+                if (i == 0) {
+                    Sessions[i] = new Session(Connection);
+                }
+                else {
+                    Sessions[i] = Sessions[0];
+                }
+                Receivers[i] = new ReceiverLink(Sessions[i], "receiver " + queue + " " +
i, queue);
+            }
+            this._Workers = Workers;
+            this._callback = callback;
+            this.Credits = Credits;
+        }
+
+
+        public void stop() {
+            running = false;
+            for (int i = 0; i < _Workers; i++) {
+                Receivers[i].Close();
+                Sessions[i].Close();
+            }
+        }
+
+
+        public void start() {
+            for (int i = 0; i < _Workers; i++) {
+                {
+                    // This variable exists otherwise we would get an olderValue of i
+                    int value = i;
+                    Task.Factory.StartNew(() => WorkerRun(value), TaskCreationOptions.LongRunning);
+                }
+            }
+        }
+
+        void WorkerRun(int i) {
+            try {
+                Receivers[i].SetCredit(Credits);
+                while (running)
+                {
+                    Message theMessage = Receivers[i].Receive(TimeSpan.FromSeconds(1));
+
+                    if (theMessage != null)
+                    {
+                        _callback(i, Sessions[i], Receivers[i], theMessage);
+                    }
+                }
+            } catch (Exception e) {
+                Console.WriteLine(e);
+            }
+        }
+     }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs
new file mode 100644
index 0000000..2685e92
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Amqp;
+using System.Threading.Tasks;
+
+namespace Artemis.Perf
+{
+
+    // this has been copied from Artemis' TokenBucketLimiter with some modifications
+    public class TokenBucketLimiterImpl {
+
+        private int rate;
+
+        /**
+            * Even thought we don't use TokenBucket in multiThread
+            * the implementation should keep this volatile for correctness
+            */
+        private long last;
+
+        /**
+            * Even thought we don't use TokenBucket in multiThread
+            * the implementation should keep this volatile for correctness
+            */
+        private int tokens;
+
+            public TokenBucketLimiterImpl(int rate) {
+            this.rate = rate;
+        }
+        private bool checkRate() {
+
+            long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+
+            if (last == 0) {
+                last = now;
+            }
+
+            long diff = now - last;
+
+            if (diff >= 1000) {
+                last = now;
+
+                tokens = rate;
+            }
+
+            if (tokens > 0) {
+                tokens--;
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        public void limit() {
+            if (!checkRate()) {
+                // Console.WriteLine("Limiting messages per max rate");
+                do {
+                    Thread.Sleep(1);
+                } while (!checkRate());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md b/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md
new file mode 100644
index 0000000..65be374
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md
@@ -0,0 +1,62 @@
+# Running the .NET AMQP example
+
+
+# Pre-requisites:
+
+All of this can be done on Linux, Mac and... Windows
+
+- Install .NET
+
+https://www.microsoft.com/net/core
+
+
+- Visual Studio Code is free and may be useful:
+
+https://code.visualstudio.com
+
+
+- Powershell might be also useful:
+
+https://github.com/PowerShell/PowerShell/
+
+
+
+# running the example
+
+- Create and start the broker, by running:
+
+```bash
+./start-server.sh
+```
+
+This broker is created by simply using the CLI. you may do it manually if you like:
+
+```bash
+artemis create ./server1 --user a --password a --role a --allow-anonymous --force
+cd server1/bin
+./artemis run
+```
+
+- Compile the code
+
+You need call restore to download AMQP Library and build it.
+Restore is part of NuGET which is sort of the Maven Repo for Java devs.
+
+```sh
+dotnet restore
+dotnet build
+dotnet run
+```
+
+Or simply use the run-example.sh script on this directory
+
+- Debugging
+
+Visual Studio Code will make it fairly easy to do it
+
+
+# About this example
+
+This is sending messages, limited to 25K messages a second.
+The consumer will have a pool of consumers, which will synchronously acknowledge messages.
+.NET threading model is expensive, this example shows how to make most of your resources
by a pool of consumers.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d805074b/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh
----------------------------------------------------------------------
diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh b/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh
new file mode 100755
index 0000000..4b50a0c
--- /dev/null
+++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh
@@ -0,0 +1,29 @@
+#!/usr/bin/env sh
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Setting the script to fail if anything goes wrong
+set -e
+
+rm -rf ./server1
+../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous
--force
+./target/server0/bin/artemis run
+#sleep 1
+#../../../../target/clustered-static-node1/bin/artemis run | tee server2.log &
+#sleep 1
+#../../../../target/clustered-static-node2/bin/artemis run | tee server3.log &
+#sleep 1


Mime
View raw message