10-30 2,337 views
Java从JDK 1.4开始支持NIO(New IO),与传统IO相比,NIO是非阻塞的,它包含三个核心组件:
- Channel,类似于Stream,但不同于Stream单向,Channel是双向的,从同一个Channel既可以读取数据,也可以写入数据;
- Buffer,向Channel写入数据或从Channel读取数据时,数据需先写入Buffer或读入Buffer;
- Selector,多个Channel可以注册到一个Selector中,Selector可以通过单线程侦听这些Channel,当有读、写、连接事件时(而不是阻塞等待读、写或连接),执行相应的操作。
基于Java NIO实现服务端与客户端的非阻塞通信如图所示:
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.magicwt; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; public class Server { private static AtomicInteger count = new AtomicInteger(1); public static void main(String[] args) throws Exception { // 新建ServerSocketChannel实例,设置为非阻塞,绑定端口8081; ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8081)); // 新建Selector实例,并将ServerSocketChannel实例注册到Selector实例中,侦听ACCEPT事件 Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 从Selector实例中获取事件 selector.select(); Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()) { // 对于ACCEPT事件,新建SocketChannel实例,并将SocketChannel实例注册到Selector实例中,侦听READ事件,通过SocketChannel实例向客户端返回消息 serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("连接请求已收到,服务端建立连接"); socketChannel.write(ByteBuffer.wrap(new String("服务端消息" + count.getAndIncrement()).getBytes())); } else if (selectionKey.isReadable()) { // 对于READ事件,通过SocketChannel实例读取客户端消息并向客户端返回消息 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); socketChannel.read(byteBuffer); System.out.println("已读取信息:" + new String(byteBuffer.array())); socketChannel.write(ByteBuffer.wrap(new String("服务端消息" + count.getAndIncrement()).getBytes())); } } } } } |
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package com.magicwt; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; public class Client { private static AtomicInteger count = new AtomicInteger(1); public static void main(String[] args) throws Exception { // 新建SocketChannel实例,设置为非阻塞,连接127.0.0.1:8080; SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081)); // 新建Selector实例,并将SocketChannel实例注册到Selector实例中,侦听CONNECT事件 Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true) { // 从Selector实例中获取事件 selector.select(); Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { // 对于CONNECT事件,新建SocketChannel实例,并将SocketChannel实例注册到Selector实例中,侦听READ事件,通过SocketChannel实例向服务端返回消息 socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("客户端建立连接"); socketChannel.write(ByteBuffer.wrap(new String("客户端消息" + count.getAndIncrement()).getBytes())); } else if (selectionKey.isReadable()) { // 对于READ事件,通过SocketChannel实例读取服务端消息并向服务端返回消息 socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); socketChannel.read(byteBuffer); System.out.println("已读取信息:" + new String(byteBuffer.array())); socketChannel.write(ByteBuffer.wrap(new String("客户端信息" + count.getAndIncrement()).getBytes())); } } } } } |