qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shahbaz Chaudhary (JIRA)" <qpid-...@incubator.apache.org>
Subject [jira] Created: (QPID-1146) Excel RTD Server
Date Fri, 20 Jun 2008 13:35:45 GMT
Excel RTD Server
----------------

                 Key: QPID-1146
                 URL: https://issues.apache.org/jira/browse/QPID-1146
             Project: Qpid
          Issue Type: New Feature
          Components: Dot Net Client
    Affects Versions: M2
         Environment: Windows .NET with Excel
            Reporter: Shahbaz Chaudhary
             Fix For: M2


--QUOTED FROM AN EMAIL SENT TO QPID'S MAILING LIST--
Hi All,

The following email contains an Excel RTD server which is able to subscribe to information
from an M2 Qpid server.
This is just a proof of concept, there are almost no optimizations, check for leaks, etc.
 I'm not really a .NET programmer and I have never done any COM programming (and no C++ since
college).

In any case, try it out.  It works for me.  I haven't figured out how to run it outside Visual
Studio 2008 yet (no idea how to create install packages, register assemblies, etc.).

QPID C# folks are welcome to use it as they wish.  I probably won't maintain this, hopefully
someone else will.

In excel, you just have to type the following formula:
=rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
2'","<topic>","<field>")
This will subscribe to a topic, each time an update is received, it will retrieve a field
and display it in Excel.
Example:

=rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
2'","md.bidsoffers","price")

This will just display the price for all incoming bidsoffers (so MSFT/ORCL/EBAY will be mixed
in)

You can also add 'filters' to the RTD function.  Just append two parameters to the end of
the previous RTD function, the first parameter refers to the field you wish to use in comparison,
the second parameter refers to the value the first parameter must have.

Example:

=rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
2'","md.bidsoffers","price",
"symbol","MSFT")

This will subscribe to the same information as the last RTD function, but will only display
prices for MSFT.

You can add as many filters as you like, just make sure you append the RTD function with a
filter field and a filter value.
--8<------------------------------RTDTest.cs----------------------------
-----
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
using Apache.Qpid.Client;
using System.Runtime.InteropServices;
using Microsoft.Office.Interop.Excel;

//Shahbaz Chaudhary
namespace RTDTest
{
    [ComVisible(true), ProgId("RTD.Test")]
    public class RTDTest : IRtdServer
    {
        //QPID CACHE
        Dictionary<string, IChannel> channelCache;//url, channel
        Dictionary<string, int> channelCacheCount;
        Dictionary<string, IMessageConsumer> topicCache;//url+topic, consumer
        Dictionary<string, int> topicCacheCount;
        Dictionary<int, string> topicIDCache;//url+topic+field,  topicid
        Dictionary<string, IList<Tuple2<int, string>>> topicTopicIDFieldCache;
        Dictionary<int, Tuple2<string[], string[]>> filtersCache;
        //END QPID CACHE

        //IRTDServer Globals
        IRTDUpdateEvent updateEvent;
        Queue<Tuple2<int, object>> refreshQ;
        //END IRTDServer Globals

        public RTDTest()
        {
            channelCache = new Dictionary<string, IChannel>();
            topicCache = new Dictionary<string, IMessageConsumer>();
            channelCacheCount = new Dictionary<string, int>();
            topicCacheCount = new Dictionary<string, int>();
            topicIDCache = new Dictionary<int, string>();
            topicTopicIDFieldCache = new Dictionary<string, IList<Tuple2<int, string>>>();
            filtersCache = new Dictionary<int, Tuple2<string[], string[]>>();

            refreshQ = new Queue<Tuple2<int, object>>();
        }

        //QPID METHODS

        private IChannel getChannel(string url)
        {
            IChannel chan;
            if (channelCache.ContainsKey(url))
            {
                chan = channelCache[url];
            }
            else
            {
                IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(url);
                Apache.Qpid.Messaging.IConnection connection = new AMQConnection(connectionInfo);
                IChannel channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge,
1);
                connection.Start();
                chan = channel;
                channelCache[url] = chan;
            }
            return chan;
        }

        private IMessageConsumer getTopicConsumer(string url, string
topic)
        {
            IMessageConsumer cons;
            string key = url + topic;
            if (topicCache.ContainsKey(key))
            {
                cons = topicCache[key];
            }
            else
            {
                IChannel channel = getChannel(url);
                string tempQ = channel.GenerateUniqueName();
                channel.DeclareQueue(tempQ, false, true, true);
                cons = channel.CreateConsumerBuilder(tempQ).Create();
                channel.Bind(tempQ, ExchangeNameDefaults.TOPIC, topic);
                topicCache[key] = cons;
            }
            return cons;
        }

        private IList<Tuple2<int, string>> getFields(string topic)
        {
            if (!topicTopicIDFieldCache.ContainsKey(topic))
            {
                topicTopicIDFieldCache[topic] = new List<Tuple2<int,
string>>();
            }
            return topicTopicIDFieldCache[topic];
        }

        private void onMessage(IMessage msg, string url, string topic, string field, int topicid)
        {
                foreach (Tuple2<int, string> f in getFields(topic))//?
                {
                    int id = f.a;
                    object value = msg.Headers[f.b];
                    //Dictionary<int, object> d = new Dictionary<int,
object>();
                    //d.Add(id, value);
                    string[] filterFields = filtersCache[id].a;
                    string[] filterVals = filtersCache[id].b;
                    if(allFiltersTrue(filterFields,filterVals,msg)){
                        refreshQ.Enqueue(new Tuple2<int,object>(id,value));
                    }
                }
                try
                {
                    updateEvent.UpdateNotify();
                }
                catch (COMException e)
                {
                }
        }

        void registerTopicID(string url, string topic, string field, int
topicid)
        {
            string val = url + "|" + topic + "|" + field;
            topicIDCache.Add(topicid, val);
            Tuple2<int, string> dict = new Tuple2<int,
string>(topicid,field);
            getFields(topic).Add(dict);

            if (!channelCacheCount.ContainsKey(url))
channelCacheCount[url] = 0;
            channelCacheCount[url]++;
            if (!topicCacheCount.ContainsKey(url + "|" + topic)) topicCacheCount[url + "|"
+ topic] = 0;
            topicCacheCount[url + "|" + topic]++;


            getTopicConsumer(url, topic).OnMessage += msg => { onMessage(msg,url, topic,
field, topicid); };
        }

        private bool allFiltersTrue(string[] filterKeys, string[] filterVals, IMessage msg)
        {
            for (int i = 0; i < filterKeys.Length; i++)
            {
                if
(!msg.Headers[filterKeys[i]].ToString().Equals(filterVals[i]))
                {
                    return false;
                }
            }
            return true;
        }

        public void removeRegisteredTopic(int topicid)
        {
            string vals = topicIDCache[topicid];
            string[] keys = vals.Split(new char[] { '|' });
            string url = keys[0];
            string topic = keys[1];
            string field = keys[2];
            channelCacheCount[url]--;
            topicCacheCount[url + "|" + topic]--;

            if (channelCacheCount[url] <= 0)
            {
                channelCacheCount.Remove(url);
                channelCache[url].Dispose();
                channelCache.Remove(url);
            }
            if (topicCacheCount[url + "|" + topic] <= 0)
            {
                topicCacheCount.Remove(url + "|" + topic);
                topicCache[url + "|" + topic].Dispose();
                topicCache.Remove(url + "|" + topic);

                topicTopicIDFieldCache.Remove(topic);
            }
            filtersCache.Remove(topicid);
        }

        //END QPID METHODS
 
//----------------------------------------------------------------------
-----------------------------------
        //IRTDServer METHODS
        #region IRtdServer Members

        public int ServerStart(IRTDUpdateEvent CallbackObject)
        {
            updateEvent = CallbackObject;
            return 1;
        }

        public object ConnectData(int TopicID, ref Array Strings, ref bool GetNewValues)
        {
            int size = Strings.Length;
            int conditions = (int)Math.Floor((double)(size - 3) / 2);

            string url;
            string topic;
            string field;
            string[] filterKeys = new string[conditions];
            string[] filterVals = new string[conditions];

            url = (string)Strings.GetValue(0);
            topic = (string)Strings.GetValue(1);
            field = (string)Strings.GetValue(2);

            for (int i = 0; i < conditions; i = i + 2)
            {
                filterKeys[i] = (string)Strings.GetValue(i + 3);
                filterVals[i] = (string)Strings.GetValue(i + 1 + 3);
            }

            Tuple2<string[], string[]> filters = new Tuple2<string[], string[]>(filterKeys,filterVals);
            filtersCache.Add(TopicID, filters);

            registerTopicID(url, topic, field, TopicID);
            return "Getting data...";
        }


        public void DisconnectData(int TopicID)
        {
            removeRegisteredTopic(TopicID);
        }

        public int Heartbeat()
        {
            return 1;
        }

        public Array RefreshData(ref int TopicCount)
        {
            Tuple2<int, object> data;
            object[,] result = new object[2, refreshQ.Count];
            TopicCount = 0;
            for (int i = 0; i < refreshQ.Count; i++)
            {
                data = refreshQ.Dequeue();
                TopicCount++;
                result[0, i] = data.a;
                result[1, i] = data.b;
            }

            return result;
        }

        public void ServerTerminate()
        {
            foreach (IChannel c in channelCache.Values)
            {
                c.Dispose();
            }
        }

        #endregion
        //END IRTDServer METHODS
    }

    class Tuple2<T, U>
    {
        public Tuple2(T t, U u)
        {
            a = t;
            b = u;
        }
        public T a { get; set; }
        public U b { get; set; }
    }

    class Tuple3<T, U, V>
    {
        public Tuple3(T t, U u, V v)
        {
            a = t;
            b = u;
            c = v;
        }
        public T a { get; set; }
        public U b { get; set; }
        public V c { get; set; }
    }
}


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message