Mina入门实例

Mina入门实例

大家好,又见面了,我是全栈君。

继续上一篇,这篇主要讲通过minaB端发送消息。并接受消息,mina是一个网络通信框架,封装了javaNIO。简单易用。网上有非常多关于他的介绍,在此不赘述了。

如上篇所介绍,完毕功能,须要五个类:

PoolListener:监听,用来在系统启动的时候创建连接。

SessionPool:连接池。

SendHandler:处理类。

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个port。每一个port可建立3个长连接。因此。在系统时,就要创建长连接,以下是一个监听类:

import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;/** * 初始化连接 * @author yuanfubiao * */public class PoolListener implements ServletContextListener {	@Override	public void contextDestroyed(ServletContextEvent sce) {			}	@Override	public void contextInitialized(ServletContextEvent sce) {		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");		SessionPool pool = new SessionPool();		try {						pool.init(nds_ip, nds_ports);		} catch (Exception e) {			e.printStackTrace();		}	}}

以下是监听配置,是配置在web.xml中:

    <display-name>Apache-Axis2</display-name>    <context-param>    	<param-name>nds_ip</param-name>    	<param-value>XX.XXX.XXX.XXX</param-value>    </context-param>    <context-param>    	<param-name>nds_ports</param-name>    	<param-value>12210,12211,12212,12213,12214,12215</param-value>    </context-param>    <listener>    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>    </listener>

以下是自己维护的一个连接池,相同使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码例如以下:

import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class SessionPool {		private static Log logger = LogFactory.getLog(SessionPool.class);	private static int connNum = 0;	private static String ip = null;	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();		/**	 * 初始化:读取配置文件。创建长连接	 * @throws Exception 	 */	public void init(String nds_ip,String nds_ports) throws Exception{		String[] ports = nds_ports.split(",");		ip = nds_ip;				for(int i=0;i<ports.length;i++){						int port = Integer.parseInt(ports[i]);			ConnectFuture future = null;						for(int j=0;j<3;j++){				String connNum = this.getConnNums();				logger.info("创建连接号---->>>>>" + connNum);				connNumPorts.put(connNum, port);				future = SessionPool.createConnect(ip, port);				if(future.isConnected()){					logger.info("创建连接------->" + future.getSession());					pool.put(connNum, future.getSession());				}else{					logger.error("连接创建错误,请检查IP和端口配置!" + future);				}						}		}	}		/**	 * 获取一个连接	 * @param num	 * @return	 */	public static IoSession  getSession(String strNum){				logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));				IoSession session = pool.get(strNum);				if(null == session || !session.isClosing()){			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));						if(!newConn.isConnected()){				newConn =  createConnect(ip,connNumPorts.get(strNum));			}			session = newConn.getSession();			pool.replace(strNum, session);		}				return session;	}		/**	 * 创建连接	 * @param ip	 * @param port	 * @return	 */	private static ConnectFuture createConnect(String strIp,int intPort){				IoConnector connector = new NioSocketConnector();				connector.getFilterChain().addLast("codec"				,new ProtocolCodecFilter(new CharsetCodecFactory()));				connector.setHandler(new SendHandler());				ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));		connector.getSessionConfig().setReadBufferSize(128);		future.awaitUninterruptibly();				return future;	}		/**	 * 生成连接序列号	 * @return	 */	private synchronized String getConnNums(){				if(18 == connNum){			connNum = 0;		}				connNum++;				return String.format("%02x", connNum);	}}

因此。在项目启动的时候就会有18个连接自己主动创建。并放在pool中等待我们的使用。

以下是业务处理类。须要继承IoHandlerAdapter类。而且实现以下几个方法:

import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import cm.custom.service.reception.RecResponse;import cm.custom.service.reception.ReceptionResponseServiceStub;/** * 业务处理 * @author yuanfubiao * */public class SendHandler extends IoHandlerAdapter {	private static Log logger = LogFactory.getLog(SendHandler.class);		@Override	public void exceptionCaught(IoSession session, Throwable cause)			throws Exception {		logger.error("连接出错", cause);	}	@Override	/**	 * 设置空暇时间	 */	public void sessionCreated(IoSession session) throws Exception {		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);	}		/**	 * 接受到消息后,通过WS发送给用户管理系统	 */	@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String result = message.toString().trim();		String temp = result.substring(0, result.length()-16).trim();		logger.info("接受到的数据:" + result);		//验证签名		String signature = null;		String securityKey = "12345678";		try {			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);			signature = new String(Hex.encodeHex(binSignature));		} catch (Exception e) {			e.printStackTrace();		}				String packet = temp + signature.toUpperCase().trim();				if(!result.equalsIgnoreCase(packet)){			logger.error("数字签名不对。错误指令:" + result);			return;		}		logger.info("接受到的数据:" + packet);		RecResponse res = new RecResponse();		res.setResponse(temp);		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();		stub.recResponse(res);	}		/**	 * 连接空暇时。发送心跳包	 */	@Override	public void sessionIdle(IoSession session, IdleStatus status)			throws Exception {		if(status == IdleStatus.BOTH_IDLE){				session.write("heartbeat");		}	}}

一般我们在写socket程序时。用堵塞的方式读取消息,通常是依据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完毕,在mina中,有默认的编解码方式。但也能够自己定义,比方以长度来推断一条消息是否读取完毕:

编码

import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;/** * 编码 * @author yuanfubiao * */public class CharsetEncoder extends ProtocolEncoderAdapter{		private final static Charset charset = Charset.forName("utf-8");		@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)			throws Exception {				IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);		buff.putString(message.toString(), charset.newEncoder());				buff.flip();		out.write(buff);	}}

解码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if(in.remaining() >= 9){ //心跳为最小传输长度
			
			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}
			
			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;
			
			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

源代码下载:
http://download.csdn.net/detail/stubbornpotatoes/7438435

关于mina发现一个系列好文章:http://xxgblog.com/2014/10/16/mina-netty-twisted-10/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/115701.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 你对贝叶斯统计都有怎样的理解?

    你对贝叶斯统计都有怎样的理解?作者:王冲链接:https://www.zhihu.com/question/21134457/answer/40753337来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。谢邀。Bayesian学派说概率是一个人对于一件事的信念强度,概率是主观的。而频率派是说概率是客观的。所有能用客观概率假设能解的题,用主观概率假设也都能解,答案一样。对

  • mysql 时间戳转日期格式[通俗易懂]

    mysql 时间戳转日期格式[通俗易懂]一、MySQL日期和时间戳的转换1.日期转时间戳selectUNIX_TIMESTAMP(‘2018-12-2512:25:00’);结果:15457119002.时间戳转日期:FROM_UNIXTIME(unix_timestamp)–unix_timestamp为时间戳selectFROM_UNIXTIME(1545711900);结果:2018-12-251…

  • Landsat8卫星介绍[通俗易懂]

    Landsat8卫星介绍[通俗易懂]2013年2月11号,NASA成功发射了Landsat8卫星,为走过了四十年辉煌岁月的Landsat计划重新注入新鲜血液,设计使用寿命为至少5年。Landsat8上携带有两个主要载荷:OLI和TIRS,其中OLI(全称:OperationalLandImager,陆地成像仪)由卡罗拉多州的鲍尔航天技术公司研制;TIRS(全称:ThermalInfraredSensor,热红外传感器

  • tcp/ip协议包含哪几层_ip协议提供的是一种什么服务

    tcp/ip协议包含哪几层_ip协议提供的是一种什么服务在OSI模型中ARP协议属于链路层;而在TCP/IP模型中,ARP协议属于网络层。1)ARP分层的位置是TCP/IP的网络层2)ARP报文是由以太网帧进行封装传输的。没有封装进IP包。3)实际上

  • 递归求数组的和_java递归教程

    递归求数组的和_java递归教程使用递归实现数组求和示例分享思路如下:给定一个含有n个元素的整型数组a,求a中所有元素的和。问题的难点在于如何使用递归上。如果使用递归,则需要考虑如何进行递归执行的开始以及终止条件,首先如果数组元素个数为0,那么和为0。同时,如果数组元素个数为n,那么先求出前n-1个元素之和,再加上a[n-1]即可。此时可以完成递归功能。总之,递归就是在某个函数的执行过程中首先判断它的终止条件参数,终止条件参数满…

  • 提升效率的秘密,仅需这一篇吃透负载均衡

    提升效率的秘密,仅需这一篇吃透负载均衡写在前面写本文的目的: 对负载均衡的理解零零散散,不成体系。 阅读这篇文章需要的条件: 对OSI模型有些许了解 有耐心。本文涉及大量的知识点,且只能用文字才能讲清楚,所以文字比较多。 收获: 读完此篇文章,从宏观的角度理解了负载均衡的原理以及实现机制。加深对分布式架构的了解 主要内容: 本文首先从概念开始,讲解什么是负载均衡,以及负载均衡在分布式系统中所承担的角色以及提供的功能。 讲解负载均衡的分类。分别从软硬件角度、地域范围角度以及…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号