一、Tars框架基本介绍
Tars是腾讯开源的支持多语言的高性能RPC框架,起源于腾讯内部2008年至今一直使用的统一应用框架TAF(Total Application Framework),目前支持C++、Java、PHP、Nodejs、Go语言。
官方仓库地址:
vivo推送平台也深度使用了该框架,部署服务节点超过一千个,经过线上每日一百多亿消息推送量的考验。
Tars-java 最新稳定版1.7.2以及之前的版本都使用Java NIO进行网络编程;本文将分别详细介绍java NIO的原理和Tars 使用NIO进行网络编程的细节。
二、Java NIO原理介绍
从1.4版本开始,Java提供了一种新的IO处理方式:NIO (New IO 或 Non-blocking IO 是一个可以替代标准Java IO 的API,它是面向缓冲区而不是字节流,它是非阻塞的,支持IO多路复用。
2.1 Channels (通道 and Buffers (缓冲区
Channel类型:
-
能通过UDP读写网络中的数据的DatagramChannel
-
能通过TCP读写网络数据的SocketChannel
-
可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel的ServerSocketChannel 。
支持文件读写数据的FileChannel
SocketChannel:
-
关闭 SocketChannel:socketChannel.close(;
-
从Channel中读取的数据放到Buffer: int bytesRead = inChannel.read(buf;
-
将Buffer中的数据写到Channel: int bytesWritten = inChannel.write(buf;
打开 SocketChannel:SocketChannel socketChannel = SocketChannel.open(;
ServerSocketChannel:
通常不会仅仅只监听一个连接,在while循环中调用 accept(方法. 如下面的例子:
代码1:
while(true{
SocketChannel socketChannel = serverSocketChannel.accept(;
//do something with socketChannel...
}
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(;
serverSocketChannel.socket(.bind(new InetSocketAddress(8888;
serverSocketChannel.configureBlocking(false;
while(true{
SocketChannel socketChannel = serverSocketChannel.accept(;
if(socketChannel != null{
//do something with socketChannel...
}
}
Buffer类型:
-
CharBuffer
-
FloatBuffer
-
LongBuffer
Buffer的分配:
Buffer的读写:
-
调用flip(方法将Buffer从写模式切换到读模式,此时position移动到开始位置0,limit移动到position的位置。
-
从Buffer中读取数据,在读模式下可以读取之前写入到buffer的所有数据,即为limit位置。
-
调用clear(方法或者compact(方法。clear(方法将position设为0,limit被设置成capacity的值。compact(方法将所有未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素后面。
写入数据到Buffer,最大写入量是capacity,写模式下limit值即为capacity值,position即为写到的位置。
mark( 与 reset(方法 通过调用Buffer.mark(方法,可以标记Buffer中的一个特定position,之后可以通过调用Buffer.reset(方法恢复到这个position。
duplicate( 此方法返回承载先前字节缓冲区内容的新字节缓冲区。
remaining(limit 减去 position的值
2.2 Selector(选择器)
Java NIO引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程可以监听多个数据通道。要使用Selector,得向Selector注册Channel,然后调用它的select(方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。
代码3:
channel.configureBlocking(false;
SelectionKey key = channel.register(selector,Selectionkey.OP_READ;
SelectionKey包含:
1 interest集合:selectionKey.interestOps( 可以监听四种不同类型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ
2 ready集合:selectionKey.readyOps(; ready 集合是通道已经准备就绪的操作的集合,提供4个方便的方法:
-
selectionKey.isConnectable(;
-
selectionKey.isWritable(;
3 Channel:selectionKey.channel(;
4 Selector:selectionKey.selector(;
5 可选的附加对象:
提示:
OP_ACCEPT和OP_CONNECT的区别:简单来说,客户端建立连接是connect,服务器准备接收连接是accept。一个典型的客户端服务器网络交互流程如下图
selectedKeys(
wakeUp(
close(
通过Selector选择通道:
-
int select(long timeout 增加最长阻塞毫秒数