qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wai Yang Yap" <waiy...@quince.nl>
Subject Question regarding Java Publisher and Ruby Subscriber
Date Mon, 16 Jun 2008 09:32:18 GMT
Hi,

 

My name is Wai Yang Yap and I'm new in the JMS & QPID.

Currently we are trying to connect a Java side with a Ruby side using
QPID.

We have the following situation; we have a server written in Java that
is accepting jobs from clients, and we have different servers (all in
Ruby) where we actually process the jobs. We will be pushing jobs into
the queues and the different servers will read from these queues. When
the Ruby clients are done, they will sent a message back to the queue
that is read by Java server again.

 

I have experimented with QPID for a week now, and I got the Java
Subscriber and Ruby Publisher working. So I can send messages from Ruby
to Java with the QPID broker in between them. The problem that I have
now, is when I use the same setup (only reverse, Java sending messages
to Ruby), it crashes at Ruby. Ruby receives the messages, but when it
tries to parse the message, it crashes.

 

The exception that it gives is:

 

undefined method `timestamp' for #<Codec::Decoder:0x2e388a0>

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/codec.rb:17
2:in `send'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/codec.rb:17
2:in `decode'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:223:in `decode'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:222:in `each'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:222:in `decode'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:102:in `decode'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:61:in `read'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:90:
in `reader'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:74:
in `send'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:74:
in `spawn'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `initialize'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `new'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `spawn'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:55:
in `start'

 
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/client.rb:8
3:in `start'

  D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/Receiver.rb:10

deadlock 0x2e5eaa0: sleep:-  -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36

deadlock 0x284c748: sleep:- (main) -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36

deadlock 0x2e5e550: sleep:-  -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36

D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `pop': Thread(0x2e5e758): deadlock (fatal)

      from
D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/Receiver.rb:18

 

I'm currently using the amqp-0.8 spec version. 

I tried with 0.9 however I crashed when it tries to read the specs.

Below I have added the code that I used for testing.

 

 

Ruby code:

 

require "qpid"

require 'rexml/document'

 

include Qpid

include REXML

 

spec =
Spec.load("D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/specs
/amqp.0-8.xml")

c = Client.new("127.0.0.1", 5672, spec, "localhost")

#c.start("\0guest\0guest", mechanism="PLAIN")

c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})

ch = c.channel(1)

ch.channel_open()

ch.queue_declare(:queue => "test-queue")

#ch.queue_bind(:queue_name => "test-queue")

ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag")

 

while true

    msg = c.queue("ctag").pop()

      text = Document.new(msg.content.body)

    print "#{text.elements['content/msg/sentence'].text}\n"

      print "#{text.elements['content/script'].text}\n"

    ch.basic_ack(msg.delivery_tag)

    sleep 0.00001

end

ch.channel_close(:reply_code => 200, :reply_text => "Ok")

 

 

Java code:

 

package nl.quince.publisher;

 

import java.io.ByteArrayOutputStream;

import java.util.Hashtable;

 

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.naming.Context;

import javax.naming.spi.InitialContextFactory;

import javax.xml.parsers.DocumentBuilder;

import javax.xml.parsers.DocumentBuilderFactory;

 

import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;

import org.dom4j.io.DOMReader;

import org.dom4j.io.OutputFormat;

import org.dom4j.io.XMLWriter;

import org.w3c.dom.Document;

import org.w3c.dom.Element;

 

public class MessagePublisher {

 

            public static Document createXmlDoc() {

                        Document result = null;

                        try {

                                    DocumentBuilderFactory factory =
DocumentBuilderFactory.newInstance();

                                    DocumentBuilder builder =
factory.newDocumentBuilder();

                                    result = builder.newDocument();

                                    

                                    Element root =
result.createElement("content");

                                    Element msg =
result.createElement("msg");

                                    Element sentence =
result.createElement("sentence");

                                    sentence.setTextContent("hello
world");

                                    msg.appendChild(sentence);

                                    root.appendChild(msg);

                                    

                                    Element script =
result.createElement("script");

                                    script.setTextContent("puts 'hello
world'");

                                    root.appendChild(script);

                                    result.appendChild(root);

                                    

                        } catch (Exception e) {

                                    e.printStackTrace();

                        }

                        return result;

            }

            

            public static void main(String[] args) {

        Hashtable<String, String> env = new Hashtable<String, String>();

        env.put("connectionfactory.connection",
"amqp://guest:guest@clientid/localhost?brokerlist='tcp://127.0.0.1:5672'
");

        env.put("queue.queue", "test-queue");

        //env.put("topic.name", "ctag");

        

        try {

                    InitialContextFactory factory = new
PropertiesFileInitialContextFactory();

                                    Context _context =
factory.getInitialContext(env);

                                    Connection producerConnection =
((ConnectionFactory) _context.lookup("connection")).createConnection();

                    Session producerSession =
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                    Queue _queue = (Queue) _context.lookup("queue");

                    MessageProducer producer =
producerSession.createProducer(_queue);

                    producerConnection.start();

            

                    for (int msg = 0; msg < 10; msg++) {

                        Document doc = createXmlDoc();

                        ByteArrayOutputStream out = new
ByteArrayOutputStream();

                        XMLWriter writer = new XMLWriter(out,
OutputFormat.createPrettyPrint());

                        writer.write((new DOMReader().read(doc)));

                        writer.flush();

                        writer.close();

                        

                        BytesMessage message =
producerSession.createBytesMessage();

                        message.writeBytes(out.toByteArray());

                        producer.send(message);

                    }

                    producer.close();

                    producerConnection.close();

                    

        } catch (Exception e) {

            e.printStackTrace();

        }

            }

}

 

How can I solve the problem with the timestamp?

Thanx in advance for your help.

 

Best regards,

 

Wai Yang



Wai Yang Yap | quince
------------------------------------------
mobile  
e-mail  waiyang@quince.nl
web     www.quince.nl
------------------------------------------
assumburg 73
1081 gb amsterdam
the netherlands
tel: +31 (0)20 3471000
fax:+31 (0)20 3471005


Nederlands:
Dit bericht kan vertrouwelijke informatie bevatten. Indien u niet de geadresseerde van dit
bericht bent, verzoeken wij u dit bericht te vernietigen zonder van de inhoud kennis te nemen
en de inhoud ervan niet te gebruiken, niet te kopieren en niet onder derden te verspreiden.
Quince is een handelsnaam die wordt gevoerd door Quince B.V. te Amsterdam.
Op alle werkzaamheden zijn de algemene voorwaarden van Quince B.V. van toepassing. Daarin
is in artikel 11 een beperking van aansprakelijkheid opgenomen. De voorwaarden worden op verzoek
kosteloos toegezonden. De voorwaarden zijn ook na te lezen op http://www.quince.nl/algemenevoorwaarden

English:
This message may contain information that is privileged or confidential. If you are not the
named addressee of this message, please destroy it without reading, using, copying or disclosing
its contents to any other person. Quince is a trade name of Quince B.V. with its office in
Amsterdam.
All services are governed by the general terms and conditions of Quince B.V. which contain
a limitation of liability in article 11. A free copy of the general terms and conditions will
be provided upon request. The conditions can also be read on http://www.quince.nl/termsandconditions


Mime
  • Unnamed multipart/alternative (inline, 7-Bit, 0 bytes)
View raw message