mina-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <gw574813...@sina.com>
Subject Ask a question on the mina
Date Wed, 15 Apr 2015 14:15:33 GMT

Hi:I am in the process of using Mina, in a large number of concurrent test, will timeout situation,I
set the readTimeOut to 54s, but still there will be individual session 1s timeout;The following
code:
package com.palmcity.bus.servers;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.palmcity.bus.code.TriffProtocolCodecFactory;
import com.palmcity.bus.handler.AcceptorHandler;

/**
 * tcp中心服务器
 * @author gouwei
 *
 */
public class SocketAdapter extends AbstractProduct{
	/**
	 * 缓冲区大小
	 */
    private static final int BUFFER_SIZE = 1024 * 500;

    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            return new Thread(null, r, "acceptor", 64 * 1024);
        }
    };
    private OrderedThreadPoolExecutor executor;
	public void startServer(){
		try {
		    executor = new OrderedThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, THREAD_FACTORY);
	        acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
	        acceptor.setReuseAddress(true);
	        acceptor.setBacklog(config.getMaxClinetNum());
	        acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
	       // acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, TIMEOUT);
	        acceptor.getSessionConfig().setReaderIdleTime(45);
	        //acceptor.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, TIMEOUT);
	        acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
	        //添加解码器 ,编码器
	        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TriffProtocolCodecFactory()));
	        //接收处理handler
	        acceptor.setHandler(new AcceptorHandler());
			acceptor.bind(new InetSocketAddress(config.getPort()));
			} catch (IOException e) {
				logger.error("socket 服务启动异常!",e);
				if(acceptor == null)
					acceptor.dispose();
			}
	}

	
}


package com.palmcity.bus.handler;

import java.nio.ByteBuffer;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palmcity.bus.conn.TCPConnectionFactory;
import com.palmcity.bus.conn.TcpConnection;
import com.palmcity.bus.data.BaseData;
import com.palmcity.bus.data.Command;
import com.palmcity.bus.data.CommandType;
import com.palmcity.bus.data.RepalyData;
import com.palmcity.bus.data.SrcRequestDataQueue;
import com.palmcity.bus.data.response.ResponseCode;
import com.palmcity.bus.handler.request.ReqLoginHandler;
import com.palmcity.bus.jdbc.TriffDBService;
import com.palmcity.bus.utils.Config;
import com.palmcity.bus.utils.MemcachedUtils;
import com.palmcity.bus.utils.SystemCached;

public class AcceptorHandler extends IoHandlerAdapter{
    private static final Logger logger = LoggerFactory.getLogger(IoHandlerAdapter.class);
    protected TCPConnectionFactory tcpConnectionFactory = TCPConnectionFactory.getTcpConnectionFactory();
    private final TriffDBService dbService = new TriffDBService();
    private final String STOPMID = "STOPMID";
    private final Config config = Config.getInstance();
    public synchronized void sessionCreated(IoSession session) throws Exception {
    	logger.info("create session : "+ session.getId());
    }

    public synchronized void sessionOpened(IoSession session) throws Exception {
    	 logger.info("open session : "+ session.getId());
    }

    public synchronized void sessionClosed(IoSession session) throws Exception {
    	 logger.info("客户端链接以关闭,Address = " +session.getRemoteAddress() +" ,ID=
"+ session.getAttribute(STOPMID));
    	 if(session.getAttribute(STOPMID) != null){
    		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
    		 if(tcpConn !=null){
    			// tcpConn.setActive(false);
    			 tcpConnectionFactory.remove(tcpConn.getStopMid());
    			 MemcachedUtils.getInstance().delete(MemcachedUtils.STATE_CITYCODE_STOPMID+tcpConn.getStopMid());
    			// tcpConnectionFactory.put(tcpConn.getStopMid(), tcpConn);
    			 dbService.shutdown(tcpConn.getNo(),tcpConn.getCityCode());
    		 }
    	 }
    	session.close(true);
    }

    public synchronized void sessionIdle(IoSession session, IdleStatus status) throws Exception
{
         if (status == IdleStatus.READER_IDLE) {
             logger.info("超过"+config.getTimeOut()+" 秒没有收到客户端数据,关闭客户端链接,Address
= " +session.getRemoteAddress() +" ,ID = "+session.getAttribute(STOPMID));
             session.close(true);
             //future.awaitUninterruptibly();
         } 
    	/* if(bothIdleReceived && session.getAttribute(STOPMID) != null){
    		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
    		 if(tcpConn !=null){
    			 tcpConn.setActive(false);
    			 dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
    		 }
    	 }*/
    	 //关闭链接
    }

    public synchronized void exceptionCaught(IoSession session, Throwable cause) throws Exception
{
       // if (logger.isWarnEnabled()) {
        	logger.warn("system exception : ", cause);
        //}
    }

    public synchronized void messageReceived(IoSession session, Object message)  {
    	BaseData dataBean = (BaseData)message;
    	try{
    		TcpConnection conn = tcpConnectionFactory.getTcpConnection(dataBean.getStopMid());
    		//检查链接是否为登录操作
    		if(CommandType.login.toString().equals(dataBean.getCommand())){
    		
				//登录操作
				TcpConnection tcpConnection = new TcpConnection();
				tcpConnection.setSession(session);
				session.setAttribute(STOPMID, dataBean.getStopMid());
				ReqLoginHandler handler = new ReqLoginHandler();
				handler.loginHandler(dataBean, tcpConnection);
				tcpConnectionFactory.put(stopMid,tcpConn);​
				return ;
    		}else{
	 //其他操作
    			
    		   }
    	  }
    	}catch(Exception ex){
    		logger.error("接收数据处理异常,msg = " + dataBean.toString());
    		//throw ex;
    	}
    }

    public void messageSent(IoSession session, Object message) throws Exception {
    	logger.debug("服务器发送消息:" + message );
    }

    public synchronized void inputClosed(IoSession session) throws Exception {
    	/*try{
	        if(session.getAttribute(STOPMID) != null){
	   		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
	   		 if(tcpConn !=null){
	   			 tcpConn.setActive(false);
	   		 	dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
	   		    }
	    	 }
    	}catch(Exception ex){
    		logger.error(ex.getMessage(),ex);
    	}*/
    	session.close(true);
    }


}
The log4j log:
2015-04-15 20:48:30 [INFO]-[TriffDecoder.java:63] recevier:{"type":"dataset","msgId":"21886b1b2a814e1b8944588bb1e6faa7","command":"login","cityCode":"130300","server":"192.168.3.101","stopMid":"1304100109","cipher":"5D7314ED6A7A820A0E8CF98ECA8D8A05"}
2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:58] 超过45 秒没有收到客户端数据,关闭客户端链接,Address
= /192.168.4.201:62557 ,ID = 1304100109 2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:41]
客户端链接以关闭,Address = null ,ID= 1304100109

I wonder why a large number of concurrent situation will cause this to happen? Using mina2.0.9;"1304100109"
the client login time is 2015-04-15 20:48:30, and then Mina call the sessionIdle method to
log time is 2015-04-15 20:48:30.
Looking forward to your reply.
北京掌行通信息技术有限公司gouwei@chinatransinfo.com苟伟 18612215231
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message