创建阻塞的服务器
ServerSocketChannel 与 SockelChannel
采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService; //线程池
private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
public EchoServer( throws IOException {
//创建一个线程池
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime(.availableProcessors( * POOL_MULTIPLE;
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open(;
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
serverSocketChannel.socket(.setReuseAddress(true;
//把服务器进程与一个本地端口绑定
serverSocketChannel.socket(.bind(new InetSocketAddress(port;
System.out.println("服务器启动";
}
public void service( {
while (true {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept(;
//处理客户连接
executorService.execute(new Handler(socketChannel;
} catch(IOException e {
e.printStackTrace(;
}
}
}
public static void main(String args[]throws IOException {
new EchoServer(.service(;
}
//处理客户连按
class Handler implements Runnable {
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel {
this.socketChannel = socketChannel;
}
public void run( {
handle(socketChannel;
}
public void handle(SocketChannel socketChannel {
try {
//获得与socketChannel关联的Socket对象
Socket socket = socketChannel.socket(;
System.out.println("接收到客户连接,来自:" + socket.getInetAddress( + ":" + socket.getPort(;
BufferedReader br = getReader(socket;
PrintWriter pw = getWriter(socket;
String msg = null;
while ((msg = br.readLine( != null {
System.out.println(msg;
pw.println(echo(msg;
if (msg.equals("bye" {
break;
}
}
} catch (IOException e {
e.printStackTrace(;
} finally {
try {
if(socketChannel != null {
socketChannel.close(;
} catch (IOException e {
e.printStackTrace(;
}
}
}
}
}
private PrintWriter getWriter(Socket socket throws IOException {
OutputStream socketOut = socket.getOutputStream(;
return new PrintWriter(socketOut,true;
}
private BufferedReader getReader(Socket socket throws IOException {
InputStream socketIn = socket.getInputStream(;
return new BufferedReader(new InputStreamReader(socketIn;
}
public String echo(String msg {
return "echo:" + msg;
}
}
创建非阻塞的服务器
在非阻塞模式下,EchoServer
只需要启动一个主线程,就能同时处理三件事:
- 接收客户的连接
- 接收客户发送的数据
- 向客户发回响应数据
EchoServer 委托 Selector
来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
// 创建一个Selector对象
selector = Selector.open(;
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open(;
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket(.setReuseAddress(true;
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false:
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket(.bind(new InetSocketAddress(port;
EchoServer
类的service(
方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:public void service( throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT; //第1层while循环 while(selector.select( > 0 { //获得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(; Iterator it = readyKeys.iterator(; //第2层while循环 while (it.hasNext( { SelectionKey key = null; //处理SelectionKey try { //取出一个SelectionKey key = (SelectionKey it.next(; //把 SelectionKey从Selector 的selected-key 集合中删除 it.remove(; 1f (key.isAcceptable( { 处理接收连接就绪事件; } if (key.isReadable( { 处理读就绪水件; } if (key.isWritable( { 处理写就绪事件; } } catch(IOException e { e.printStackTrace(; try { if(key != null { //使这个SelectionKey失效 key.cancel(; //关闭与这个SelectionKey关联的SocketChannel key.channel(.close(; } } catch(Exception ex { e.printStackTrace(; } } } } }
- 首先由
- 第一层 while 循环,不断询问
Selector
已经发生的事件,select(
方法返回当前相关事件已经发生的SelectionKey
的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector
的selectedKeys(
方法返回selected-keys
集合,它存放了相关事件已经发生的SelectionKey
对象 - 第二层 while 循环,从
selected-keys
集合中依次取出每个SelectionKey
对象并从集合中删除,,然后调用isAcceptable(
、isReadable(
和isWritable(
方法判断到底是哪种事件发生了,从而做出相应的处理
ServerSocketChannel
向 Selector
注册接收连接就绪事件,如果 Selector
监控到该事件发生,就会把相应的 SelectionKey
对象加入 selected-keys
集合
1. 处理接收连接就绪事件
if (key.isAcceptable( {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel key.channel(;
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel ssc.accept(;
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false;
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024;
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer;
}
2. 处理读就绪事件
public void receive(SelectionKey key throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer key.attachment(;
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannelkey.channel(;
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32;
socketChannel.read(readBuff;
readBuff.flip(;
//把buffer的极限设为容量
buffer.limit(buffer.capacity(;
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff;
}
3. 处理写就绪事件
public void send(SelectionKey key throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer key.attachment(;
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel key.channel(;
buffer.flip(;
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer;
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n" == -1
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n" + 1;
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData;
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining(
socketChannel.write(outputBuffer;
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData;
//把buffer的位置设为temp的极限
buffer.position(temp.limit(:
//删除buffer已经处理的数据
buffer.compact(;
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n" {
key.cancel(;
socketChannel.close(;
}
}
完整代码如下:
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector;
private Charset charset = Charset.forName("GBK";
public EchoServer( throws IOException {
// 创建一个Selector对象
selector = Selector.open(;
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open(;
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket(.setReuseAddress(true;
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false:
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket(.bind(new InetSocketAddress(port;
}
public void service( throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT;
//第1层while循环
while(selector.select( > 0 {
//获得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys(;
Iterator it = readyKeys.iterator(;
//第2层while循环
while (it.hasNext( {
SelectionKey key = null;
//处理SelectionKey
try {
//取出一个SelectionKey
key = (SelectionKey it.next(;
//把 SelectionKey从Selector 的selected-key 集合中删除
it.remove(;
1f (key.isAcceptable( {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel key.channel(;
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel ssc.accept(;
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false;
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024;
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer;
}
if (key.isReadable( { receive(key; }
if (key.isWritable( { send(key; }
} catch(IOException e {
e.printStackTrace(;
try {
if(key != null {
//使这个SelectionKey失效
key.cancel(;
//关闭与这个SelectionKey关联的SocketChannel
key.channel(.close(;
}
} catch(Exception ex {
e.printStackTrace(;
}
}
}
}
}
public void receive(SelectionKey key throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer key.attachment(;
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannelkey.channel(;
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32;
socketChannel.read(readBuff;
readBuff.flip(;
//把buffer的极限设为容量
buffer.limit(buffer.capacity(;
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff;
}
public void send(SelectionKey key throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer key.attachment(;
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel key.channel(;
buffer.flip(;
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer;
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n" == -1
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n" + 1;
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData;
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining(
socketChannel.write(outputBuffer;
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData;
//把buffer的位置设为temp的极限
buffer.position(temp.limit(:
//删除buffer已经处理的数据
buffer.compact(;
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n" {
key.cancel(;
socketChannel.close(;
}
}
//解码
public String decode(ByteBuffer buffer {
CharBuffer charBuffer = charset.decode(buffer;
return charBuffer.toStrinq(;
}
//编码
public ByteBuffer encode(String str {
return charset.encode(str;
}
public static void main(String args[]throws Exception {
EchoServer server = new EchoServer(;
server.service(;
}
}
阻塞模式与非阻塞模式混合使用
使用非阻塞模式时,ServerSocketChannel
以及 SocketChannel
都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer
采用一个线程同时完成这些操作
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector
注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK";
public EchoServer( throws IOException {
selector = Selector.open(;
serverSocketChannel = ServerSocketChannel.open(;
serverSocketChannel.socket(.setReuseAddress(true;
serverSocketChannelsocket(.bind(new InetSocketAddress(port;
}
public void accept( {
while(true {
try {
SocketChannel socketChannel = serverSocketChannel.accept(;
socketChannel.configureBlocking(false;
ByteBuffer buffer = ByteBuffer.allocate(1024;
synchronized(gate {
selector.wakeup(;
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer;
}
} catch(IOException e {
e.printStackTrace(;
}
}
}
private Object gate=new Object(;
public void service( throws IOException {
while(true {
synchronized(gate{}
int n = selector.select(;
if(n == 0 continue;
Set readyKeys = selector.selectedKeys(;
Iterator it = readyKeys.iterator(;
while (it.hasNext( {
SelectionKey key = null;
try {
it.remove(;
if (key.isReadable( {
receive(key;
}
if (key.isWritable( {
send(key;
}
} catch(IOException e {
e.printStackTrace(;
try {
if(key != null {
key.cancel(;
key.channel(.close(;
}
} catch(Exception ex { e.printStackTrace(; }
}
}
}
}
public void receive(SelectionKey key throws IOException {
...
}
public void send(SelectionKey key throws IOException {
...
}
public String decode(ByteBuffer buffer {
...
}
public ByteBuffer encode(String str {
...
}
public static void main(String args[]throws Exception {
final EchoServer server = new EchoServer(;
Thread accept = new Thread( {
public void run( {
server.accept(;
}
};
accept.start(;
server.service(;
}
}
注意一点:主线程的 selector select(
方法和 Accept 线程的 register(...
方法都会造成阻塞,因为他们都会操作 Selector
对象的共享资源 all-keys
集合,这有可能会导致死锁
Selector 中尚没有任何注册的事件,即 all-keys
集合为空,主线程执行 selector.select(
方法时将进入阻塞状态,只有当 Accept 线程向 Selector
注册了事件,并且该事件发生后,主线程才会从 selector.select(
方法返回。然而,由于主线程正在 selector.select(
方法中阻塞,这使得 Acccept
线程也在 register(
方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
register( 时,不允许另一个线程同时执行 select(
方法,反之亦然