mina-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murat OZDEMiR" <mozdemi...@gmail.com>
Subject RE: Help! Datagram Acceptor Problem.
Date Tue, 05 Jun 2007 07:11:08 GMT
Dear friends,
 
I've almost memorized all tutorials about UDP and looked at Nabble forums if
someone have had similar problem as me. I've tried everything last 3 days
and finally have chosen Single Thread model for both DatagramAcceptor and
ExecuterFilter which is now working good enough. But i don't prefer to use
Single Thread as well.
 
Let me explain what does my application on MINA do. Thousands of clients
send UDP packets to the server from the area. Some TCP clients are also
exist. After packet received, my message handler saves the message to the
database and constructs ACK message and writes the message into the session.
My problem is; server cannot send ACK packets to the clients in required
time-out. So clients send last packet again. The problem is not about having
possible delay to insert the incoming messages into database, the problem is
sending the packet after writing into the session.
 
I've read the Thread Model document carefully and tried various combinations
for IoServiceAcceptor and ExecuterFilter thread model such as SingleThread,
CachedThreadPool, FixedThreadPool for both DatagramAcceptor and
SocketAcceptor.
 
Apllication Stack;
tr.com.mobiliz.commserver.blo.CommunicationManager at localhost:2129
 Thread [Timer-0] (Running)
 Thread [PooledByteBufferExpirer-0] (Running)
 Thread [SocketAcceptor-0] (Running)
 Thread [DatagramAcceptor-0] (Running)
 Thread [SocketAcceptor-1] (Running)
 Thread [Timer-1] (Running)
 Thread [SocketConnectorIoProcessor-0.0] (Running)
 Thread [AnonymousIoService-1] (Running)
 Thread [Thread-6] (Running)
 Thread [DestroyJavaVM] (Running)
 Thread [Thread-5] (Running)

When i chose CachedThreadPool for both DatagramAcceptor and ExecuterFilter
such as;
 
    DatagramAcceptor gprsUdpAcceptor = new DatagramAcceptor(
Executors.newCachedThreadPool() );
    DatagramAcceptorConfig gprsUdpConfig = new DatagramAcceptorConfig();
    gprsUdpConfig.setThreadModel( ThreadModel.MANUAL );
    Executor executor = Executors.newCachedThreadPool();  
    gprsUdpChainBuilder.addLast( "GprsUdpThreadPool", new ExecutorFilter(
executor ) );
 
profiler shows me, thread number increases around one thousand and CPU
starts crying :). For both CachedThreadPool and FixedThreadPool ACK messages
sends with huge delay.
 
On the encoder side i've not forgatten!
    buff.flip();
 
You can find my application's Port Listener , Udp Listener Started, Tcp
Listener Starter and Packet Encoder in given order.
 
Where am i making mistake?
 
 
PORT LISTENER
 1:     public void start() throws Exception

 2:     {                

 3:         IoServiceStatManager statManager =
IoServiceStatManager.getInstance();

 4:         HashMap services = new HashMap();

 5:         

            ...


 9:         if (
ConfigurationManager.getInstance().getParamString("GPRS_UDP_ACCEPTOR_ENABLE"
).equals("Enabled") )

10:         {

11:             startGprsUdpListener();

12:             services.put("GPRS_UDP", gprsUdpAcceptor );

13:         }

14:         

15:         if (
ConfigurationManager.getInstance().getParamString("GPRS_TCP_ACCEPTOR_ENABLE"
).equals("Enabled") )

16:         {

17:             startGprsTcpListener();

18:             services.put("GPRS_TCP", gprsTcpAcceptor );            

19:         }

            ...
34:         statManager.startStatService( services );

35: 

36:     }


UDP LISTENER STARTER
  1:     private void startGprsUdpListener () throws ClassNotFoundException,
IOException

  2:     {

  3:         logger.info("Preparing GPRS UDP Listener" );

  4:         if (
NetworkConstants.gprsUdpModel.getThreadModelType().equals("Manual") )

  5:         {

  6:             /** getExecuterServiceType()

  7:              * 0 for Single Thread

  8:              * 1 for Cached Thread Pool

  9:              * 2 for Fixed Size Thread Pool

 10:              */

 11:             if (
NetworkConstants.gprsUdpModel.getManualThreadInst().getExecuterServiceType()
== 0 )

 12:             {

 13:                 gprsUdpAcceptor = new DatagramAcceptor();

 14:             }

 15:             else if (
NetworkConstants.gprsUdpModel.getManualThreadInst().getExecuterServiceType()
== 1 )

 16:             {

 17:                 gprsUdpAcceptor = new DatagramAcceptor(
Executors.newCachedThreadPool() );

 18:             }

 19:             else if (
NetworkConstants.gprsUdpModel.getManualThreadInst().getExecuterServiceType()
== 2 )

 20:             {

 21:                 int threadAmount = 

 22:
NetworkConstants.gprsUdpModel.getManualThreadInst().getMaxThreadAmount();

 23:                 gprsUdpAcceptor = new DatagramAcceptor(
Executors.newFixedThreadPool( threadAmount ) );

 24:             }                

 25:         }

 26:         else

 27:             gprsUdpAcceptor = new DatagramAcceptor();

 28:        

 29:         DatagramAcceptorConfig gprsUdpConfig = new
DatagramAcceptorConfig();

 30:         DefaultIoFilterChainBuilder gprsUdpChainBuilder =
gprsUdpConfig.getFilterChain();

 31:                 

 32:         /**

 33:          * Settind Thread Model to MANUAL as recommended in 

 34:          * Apache MINA Configuring Thread Model document at

 35:          * http://mina.apache.org/configuring-thread-model.html

 36:          */

 37:         Executor executor = null;

 38:         if (
NetworkConstants.gprsUdpIoProcExModel.getThreadModelType().equals("Manual")
)

 39:         {

 40:             gprsUdpConfig.setThreadModel( ThreadModel.MANUAL );

 41:             /** getExecuterServiceType()

 42:              * 0 for Single Thread

 43:              * 1 for Cached Thread Pool

 44:              * 2 for Fixed Size Thread Pool

 45:              */

 46:             if (
NetworkConstants.gprsUdpIoProcExModel.getManualThreadInst().getExecuterServi
ceType() == 1 )

 47:             {

 48:                 executor = Executors.newCachedThreadPool();


 49:             }

 50:             else if (
NetworkConstants.gprsUdpIoProcExModel.getManualThreadInst().getExecuterServi
ceType() == 2 )

 51:             {

 52:                 int threadAmount = 

 53:
NetworkConstants.gprsUdpIoProcExModel.getManualThreadInst().getMaxThreadAmou
nt();

 54:                 executor = Executors.newFixedThreadPool( threadAmount
);

 55:             }            

 56:         }

 57:         else if (
NetworkConstants.gprsUdpIoProcExModel.getThreadModelType().equals("Auto") )

 58:         {

 59: //            executor = new ThreadPoolExecutor(...);

 60: //            ExecutorThreadModel model =
ExecutorThreadModel.getInstance ("GprsUdpListener");

 61: //            model.setExecutor( executor );

 62: //            gprsUdpConfig.setThreadModel( model );            

 63:         }

 64: 

 65:         /**

 66:          * Setting UDP Session recycler to protect session.close()
after

 67:          * 60 seconds which is used MINA by default. It is set to
GPRS_MNG_UDP_SESSION_IDLE_TIMEOUT.

 68:          * If session is closed quickly, user cannot send messages to
the client after 60 seconds.

 69:          */

 70:         ExpiringSessionRecycler gprsUdpRecycler = 

 71: 		    new ExpiringSessionRecycler(
NetworkConstants.GPRS_MNG_UDP_SESSION_IDLE_TIMEOUT, 1 );

 72: 		gprsUdpConfig.setSessionRecycler( gprsUdpRecycler ); 

 73: 

 74:         /**

 75:          * Adding MuProtocolCodecFactory into the FilterChain to decode
messages comes from

 76:          * mobile units and to encode messages which are sent to the
units.

 77:          */

 78:         gprsUdpChainBuilder.addLast(

 79:                 "muCodecs",

 80:                 new ProtocolCodecFilter( new MuProtocolCodecFactory( )
) );

 81:    		

 82:         /**

 83:          * Adding Logger to trace 

 84:          * 	<li> sessionCreated, 

 85:          *  <li> sessionOpened, 

 86:          *  <li> sessionClosed, 

 87:          *  <li> sessionIdle, 

 88:          *  <li> exceptionCaught, 

 89:          *  <li> messageReceived, 

 90:          *  <li> messageSent

 91:          * events.

 92:          */

 93:         gprsUdpChainBuilder.addLast( 

 94: 		        "gprsUdpAcceptorLogger", 

 95: 		        new LoggingFilter() );

 96:                 

 97:         /**

 98:          * Add an ExecutorFilter to an IoFilterChain as described in

 99:          * http://mina.apache.org/configuring-thread-model.html

100:          * 

101:          * Be careful when you choose the thread pool type for
IoService.

102:          * Executors.newCachedThreadPool() is always preferred by
IoService. 

103:          * It is because using other thread pool type can lead to
unpredictable 

104:          * performance side effect in IoService. Once all threads in
the pool become in use, 

105:          * IoService will start to block while it tries to acquire a
thread from the pool 

106:          * and to start to show weird performance degradation, which is
sometimes very hard to trace.

107:          */

108:         if (
NetworkConstants.gprsUdpIoProcExModel.getThreadModelType().equals("Manual")
)

109:         {

110:             if (
NetworkConstants.gprsUdpIoProcExModel.getManualThreadInst().getExecuterServi
ceType() != 0 )

111:             {

112:               gprsUdpChainBuilder.addLast(

113:               "GprsUdpThreadPool", 

114:               new ExecutorFilter( executor ) );    

115:             }

116:         }

117:     		

118: 

119:         /**

120:          * Binding the Acceptor

121:          */

122:    		gprsUdpAcceptor.bind(

123:    				listenGprs,

124: 				new GprsMessageHandler( ), gprsUdpConfig );

125:    		

126:         logger.info("GPRS UDP ACCEPTOR THREAD MODEL: " +
ConfigurationManager.getInstance().getParamString("GPRS_UDP_LISTENER_THREAD_
MODEL"));

127:         logger.info("GPRS UDP ACCEPTOR I/O PROCESSOR THREAD MODEL: " +
ConfigurationManager.getInstance().getParamString("GPRS_UDP_LISTENER_IO_PROC
_EXECUTER_THREAD_MODEL"));

128:         logger.info("Listening on UDP port " +
NetworkConstants.GPRS_MNG_PORT );      

129:     }


 
TCP LISTENER STARTER
 1:     private void startGprsTcpListener () throws ClassNotFoundException,
IOException

 2:     {

 3:         logger.info("Preparing GPRS TCP Listener" );

 4:         

 5:         /**

 6:          * In MINA, there are three kinds of I/O worker threads in the
NIO socket implementation;

 7:          * 

 8:          * ACCEPTOR THREAD accepts incoming connectiongs, and forwards
the connection to the I/O 

 9:          * processor thread for read and write operations.  

10:          * Each SocketAcceptor creates one acceptor thread. You can't
configure the number of the acceptor threads. 

11:          * 

12:          * CONNECTOR THREAD attempts connections to a remote peer, and
forwards the succeeded connection to the 

13:          * I/O processor thread for read and write operations.  

14:          * Each SocketConnector creates one connector thread. You can't
configure the number of the connector threads, either. 

15:          * 

16:          * I/O PROCESSOR THREAD performs the actual read and write
operation until the connection is closed. 

17:          * Each SocketAcceptor or SocketConnector creates its own I/O
processor thread(s). 

18:          * You can configure the number of the I/O processor threads. 

19:          * The default number of the I/O processor threads is 1. 

20:          */

21:         gprsTcpAcceptor = new SocketAcceptor(

22:                 Runtime.getRuntime().availableProcessors() + 1, 

23:                 Executors.newCachedThreadPool() );

24:         SocketAcceptorConfig gprsTcpConfig = new SocketAcceptorConfig();

25:         DefaultIoFilterChainBuilder gprsTcpChainBuilder =
gprsTcpConfig.getFilterChain();

26: 

27:         /**

28:          * Settind Thread Model to MANUAL as recommended in 

29:          * Apache MINA Configuring Thread Model document at

30:          * http://mina.apache.org/configuring-thread-model.html

31:          */

32:         gprsTcpConfig.setThreadModel( ThreadModel.MANUAL );

33:         

34:         Executor executor = Executors.newCachedThreadPool();        

35:         //Executor executor = Executors.newFixedThreadPool(2); 

36:         

37:         /**

38:          * Adding MuProtocolCodecFactory into the FilterChain to decode
messages comes from

39:          * mobile units and to encode messages which are sent to the
units.

40:          */

41:         gprsTcpChainBuilder.addLast(

42:                 "muCodecs",

43:                 new ProtocolCodecFilter( new MuProtocolCodecFactory( ) )
);        

44:         

45:         /**

46:          * Adding Logger to trace 

47:          * 	<li> sessionCreated, 

48:          *  <li> sessionOpened, 

49:          *  <li> sessionClosed, 

50:          *  <li> sessionIdle, 

51:          *  <li> exceptionCaught, 

52:          *  <li> messageReceived, 

53:          *  <li> messageSent

54:          * events.

55:          */

56:         gprsTcpChainBuilder.addLast( 

57: 		        "gprsTcpAcceptorLogger", 

58: 		        new LoggingFilter() );        

59:         

60:         /**

61:          * Add an ExecutorFilter to an IoFilterChain as described in

62:          * http://mina.apache.org/configuring-thread-model.html

63:          * 

64:          * Be careful when you choose the thread pool type for
IoService.

65:          * Executors.newCachedThreadPool() is always preferred by
IoService. 

66:          * It is because using other thread pool type can lead to
unpredictable 

67:          * performance side effect in IoService. Once all threads in the
pool become in use, 

68:          * IoService will start to block while it tries to acquire a
thread from the pool 

69:          * and to start to show weird performance degradation, which is
sometimes very hard to trace.

70:          */

71:         //gprsTcpChainBuilder.addLast(

72:         //        "GprsTcpThreadPool", 

73:         //        new ExecutorFilter( executor ) );

74:         

75:    		gprsTcpAcceptor.bind(

76:    				listenGprs,

77:                 new GprsMessageHandler(), gprsTcpConfig );        

78:                         

79:    		logger.info("Listening on TCP port " +
NetworkConstants.GPRS_MNG_PORT );

80:     }


GPRS MESSAGE HANDLER
 1: 	public void messageReceived( IoSession session, Object msg ) throws
Exception 

 2: 	{

 3: 	    IncomingMessageList messageList = (IncomingMessageList) msg;

 4: 	    Iterator msgIt = messageList.getIterator();

 5: 	    while ( msgIt.hasNext() )

 6: 	    {

 7: 	        Object message = msgIt.next();

 8: 		    if ( message instanceof InbountMessageInterface )

 9: 		    {

10: 		        InbountMessageInterface msgb =
(InbountMessageInterface) message;

11: 				

12: 				...

13: 				...

14: 				

15: 			    boolean success = false;

16: 			    if ( msgb.isInformationMessage() )

17: 				{

18: 					success = 

19:
DataBaseManager.getInstance().insertMessageFromMu( (MuInboundMessageBase)
msgb );					    		   

20: 				}

21: 				else

22: 				    success = true;

23: 

24: 				if ( success )

25: 				{

26: 				    if ( msgb.isNeedServerAck() &&
isNeedServerAck )

27: 				    {

28: 						if ( session.isConnected() )

29: 						{

30: 					        OutbountMessageInterface ack
= (OutbountMessageInterface) msgb.AckMessage();

31: 							WriteFuture future =
session.write( ack );

32: 							future.join();

33: 							if (
future.isWritten() )

34: 							{

35: 							    logger.info(
"OUTGOING: " + ack.getLoggerString() );					    

36: 							}

37: 							else

38: 							    logger.error(
"OUTGOING ACK CAN NOT BE SENT: " + ack.getLoggerString() );

39: 						}

40: 						else

41: 						{

42:
SessionLog.error(session, "Session is not Connected");

43: 						}


44: 				    }				    

45: 				}

46: 				else

47: 				{

48: 				    SessionLog.error(session, "DB ERROR: ACK
will not be sent...");

49: 				}

50: 			...

51: 			...				

52: 			     

53: 		    }

54: 	    }

55: 	}


 
 
PACKET ENCODER
 1: public class PacketEncoderDT900 implements MessageEncoder

 2: {

 3:     //private Logger logger =
Logger.getLogger(PacketEncoderDT900.class);

 4:     private Logger logger = Logger.getLogger("Root");

 5:     private static final Set TYPES;

 6:     private Charset cs = Charset.forName ( "ISO-8859-1" ) ;


 7:     private CharsetEncoder encoder = cs.newEncoder () ;

 8:     

 9:     static

10:     {

11:         Set types = new HashSet();

12:         types.add( AcknowledgeMessage.class );

13: 		...

14:         TYPES = Collections.unmodifiableSet( types );

15:     }

16:     

17: 	public void encode(IoSession ioSession, Object outMsg,
ProtocolEncoderOutput encoderOutput ) throws Exception 

18: 	{

19: 

20: 		if ( outMsg instanceof AcknowledgeMessage )

21: 		{

22: 		    ByteBuffer buff = encodeAcknowledgeMessage ( outMsg );

23: 		    encoderOutput.write( buff );

24: 	        logger.info("Bytebuffer Dump:" + buff.getHexDump());


25: 		}

26: 	}

27: 	

28: 	private ByteBuffer encodeAcknowledgeMessage ( Object outMsg ) throws
Exception

29: 	{

30: 	    ByteBuffer buff = ByteBuffer.allocate(
dt900Constants.ACKNOWLEDGE_MSG_LEN  );

31: 	    //buff.clear();

32: 	    AcknowledgeMessage outgoingMsg = (AcknowledgeMessage) outMsg;

33:         buff.setAutoExpand( true ); // Enable auto-expand for easier
encoding	

34:         /* Encode here */	                    

35:         buff = buff.order( ByteOrder.LITTLE_ENDIAN );

36:         buff.put( cs );	  

37:         ...

38:         ...      

39:         /* Encoding end */

40:         buff.flip();

41:         return buff;

42: 	}	

43: }	


 
 
Best Regards.
----------
Murat OZDEMIR


-----Original Message-----
From: Mark Webb [ <mailto:elihusmails@gmail.com>
mailto:elihusmails@gmail.com]
Sent: Sunday, June 03, 2007 5:06 AM
To: dev@mina.apache.org
Subject: Re: Help! Datagram Acceptor Problem.

did you look at the tutorial for UDP that I wrote?  This might help you...

On 6/1/07, Murat OZDEMiR <mozdemir79@gmail.com> wrote:
>
> Dear Krish,
> Thanks for your reply but i'm using DatagramAcceptor (UDP/IP), not
> SocketAcceptor  :(
>
> Any other comments? Thanks
>
> ----------
> Murat ÖZDEMİR
>
>
> -----Original Message-----
> From: Krish Verma [ <mailto:kverma@gmail.com> mailto:kverma@gmail.com]
> Sent: Friday, June 01, 2007 11:54 PM
> To: dev@mina.apache.org
> Subject: Re: Help! Datagram Acceptor Problem.
>
> It might be due to Naggle's algorithm, Have you tried setting
> setTcpNoDelay to false on the socket acceptor config. Please see
>
>
>  <http://mina.apache.org/faq.html#FAQ-Nodataiswrittinouttothesessioneven>
http://mina.apache.org/faq.html#FAQ-Nodataiswrittinouttothesessioneven
> iftheb
> ufferisnotempty
> .
>
> in the MINA faq.
> --
>   Krish Verma, kverma at gmail dot com
>
> On 6/1/07, Murat OZDEMiR <mozdemir79@gmail.com> wrote:
> > Hi All,
> >
> > I've just completed a communication server project using MINA.
> >
> > Clients which connects to communication server has to be received a
> spesific
> > ACK message from server.
> > After receiving message, decoder decodes it, handler inserts into
> > database and writes ACK message to session. Receiving and writing
> > ACK occurs in miliseconds but messageSent methode are called too
> > later
> > 3-15 seconds
> later.
> > That means ACK message is sent to client after 3-15 seconds. But
> > clients have to be received ACK message in 5 seconds. If they can't
> > they send the last message again. I'm using Datagram Acceptor on
> > server. 50.000 -
> 100.000
> > messages are received from clients and inserted into database in
> minutes.
> > Both database i/o and network traffic is busy enaugh. How can i
> > decrease
> the
> > sending ACK message time below 5 seconds?
> >
> > Rapid answers will make me very happy, because i'm trying to resolve
> > that issue like a crazy :(
> >
> > Best Regards.
> >
> > Murat OZDEMiR
> >
>
>

 

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message