2022-09-23 16:22

NIO三大核心组件

wanmatea

其它

(759)

(0)

收藏

NIO

Java NIO(Non-blocking I/O)是指jdk1.4 及以上版本里提供的新api(New IO),Java NIO提供了与标准IO不同的IO工作方式。

1. NIO和传统IO的不同点

传统IO

NIO

 

面向流

面向缓冲

同步阻塞

同步非阻塞

选择器(多路复用)


从图中对比中我们可以看到NIO的三个主要特点:面向缓冲、同步非阻塞和多路复用。

1、面向缓冲

传统IO是面向流的,NIO是面向缓冲的。传统IO是每次从流中读一个或多个字节,直到读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。NIO是将数据读取到一个她稍后处理的缓冲区,需要时可在缓冲区中前后移动,增加了处理过程中的灵活性。

2、同步非阻塞

传统IO的流是阻塞的,当一个线程调用read() 或 write()时,该线程被阻塞,直到数据完成读取或写入完成,此期间该线程不能再干任何事情了。NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。

3、多路复用

多路复用是指使用单线程也可以通过轮询监控的方式实现多线程类似的效果。简单的说就是,通过选择机制,使用一个单独的线程很容易来管理多个通道。

2. 三大核心组件

要理解这几个特点,我们需要知道NIO的三大核心组件:通道(Channel)、缓冲(Buffer)、选择器(Selector)。

2.1. Channel管道和Buffer缓冲区

传统IO基于字节流和字符流进行操作的, 而在NIO中并不是以流的方式来处理数据的,而是以buffer缓冲区和Channel管道配合使用来处理数据。

Channel管道可以比作成铁路,buffer缓冲区可以比作成火车

NIO就是通过Channel管道运输着存储数据的Buffer缓冲区的来实现数据的处理。

Channel不与数据打交道,它只负责运输数据。与数据打交道的是Buffer缓冲区。

相对于传统IO而言,流是单向的。对于NIO而言,有了Channel管道这个概念,我们的读写都是双向的。

通道主要分为文件通道和Socket通道:

2.1.1. 文件通道 

文件通道总是处于阻塞模式。创建文件通道最常用的三个类是FileInputStream、FileOutputStream和RandomAccessFile,它们均提供了一个getChannel()方法,用来获取与之关联的通道。

FileInputStream创建的通道只能读,FileOutputStream创建的通道只能写,而RandomAccessFile可以创建同时具有读写功能的通道(使用“rw”参数创建)。

1、创建通道

示例:

public class Test{
public static void main(String[] args)
{
String filepath="test.txt";
RandomAccessFile randomAccessFile;
try {
randomAccessFile = new RandomAccessFile(filepath, "rw");
FileChannel readAndWriteChannel = randomAccessFile.getChannel();
    
FileInputStream fis = new FileInputStream(filepath);
FileChannel readChannel = fis.getChannel();
FileOutputStream fos = new FileOutputStream(filepath);
FileChannel writeChannel = fos.getChannel();
readAndWriteChannel.close();
readChannel.close();
 
writeChannel.close();
            } catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException e) {
                e.printStackTrace();
    }
}
}

2、使用Buffer读写数据

使用Buffer读写数据一般遵循以下五个步骤:

1、分配缓冲区

2、写入数据到Buffer

3、调用flip()方法,将Buffer从写模式切换到读模式(必须调用这个方法)

4、从Buffer中读取数据

5、调用clear()方法或者compact()方法,清空缓冲区

示例:

public class Test{
public static void main(String[] args)
{
String filepath="test.txt";
          RandomAccessFile randomAccessFile;
        try {
randomAccessFile = new RandomAccessFile(filepath, "rw");
             FileChannel channel = randomAccessFile.getChannel();
             //创建容量为1024字节的缓冲区
             ByteBuffer buf = ByteBuffer.allocate(1024);
            //写入数据到buffer
            while(channel.read(buf)!=-1)
            {
                 //将Buffer从写模式切换到读模式
                 buf.flip();
                 //判断是否有未读数据
                while(buf.hasRemaining())
                {
 
                     System.out.print((char) buf.get());
                }
                 //清空缓冲区
                 buf.clear();
            }
              channel.close();
         } catch (FileNotFoundException e) {
            e.printStackTrace();
}catch (IOException e) {
e.printStackTrace();
}
}
}

2.1.2. Socket通道

在学习Socket通道之前,先复习一下传统的面向流的Socket编程,主要的类有三个,ServerSocket、Socket和InetSocketAddress,分别代表服务端、套接字和地址。

Socket通道实现了与传统Socket类似的功能,其类名、API都与传统Socket非常类似。ServerSocketChannel对应ServerSocket,SocketChannel对应Socket。

除此之外,Socket通道还同时支持“阻塞”模式与“非阻塞”模式。传统Socket仅支持“阻塞”模式,其用于连接双方套接字的accept()和connect()方法都是阻塞的;而Socket通道除了默认为阻塞模式外,同时还提供了一组非阻塞的连接方法。

1、阻塞模式

服务端(BlockingChannelServer):

public class BlockingChannelServer {
public static void main(String[] args)
{
try
{
ServerSocketChannel  ssc=ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(12345));
System.out.println("启动服务器....");
                //一直等待来自客户端的请求
SocketChannel sc = ssc.accept();
ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (sc.read(buffer) != -1) {
                    buffer.flip();
                    byte[] bytes = new byte[buffer.remaining()];
 
                    buffer.get(bytes);
                    System.out.println(new String (bytes));
                    buffer.clear();
               }
               sc.close();
               ssc.close();
}
catch(IOException e)
{
e.printStackTrace();
}
}
}

客户端(BlockingChannelClient):

public class BlockingChannelClient {
public static void main(String[] args) {
            try {
                SocketChannel sc = SocketChannel.open();
                sc.connect(new InetSocketAddress("127.0.0.1", 12345));
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put("hello".getBytes());
                buffer.clear();
                while(buffer.hasRemaining()) {
                   sc.write(buffer);
                }
                sc.close();
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
}

2、非阻塞模式

ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。

示例:

服务端(BlockingChannelServer):

public class BlockingChannelServer {
public static void main(String[] args)
{
 try
{
ServerSocketChannel  ssc=ServerSocketChannel.open();
             ssc.bind(new InetSocketAddress(12345));
//设置为非阻塞模式
ssc.configureBlocking(false);
System.out.println("启动服务器....");
//一直等待来自客户端的请求
while(true)
{
SocketChannel sc = ssc.accept();
//判断是否为null
 if(sc!=null)
{
ByteBuffer buffer = ByteBuffer.allocate(1024);
                      while (sc.read(buffer) != -1) {
                           buffer.flip();
                           byte[] bytes = new byte[buffer.remaining()];
                           buffer.get(bytes);
                           System.out.println(new String (bytes));
                           buffer.clear();
                      }
                  }
}
}
catch(IOException e)
{
e.printStackTrace();
           }
}
}

 如果我们不检查SocketChannel是否为null,就会报空指针异常:

image.png 

客户端(BlockingChannelClient):

public class BlockingChannelClient {
public static void main(String[] args) {
       try {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("127.0.0.1", 12346));
            //设置为非阻塞模式
            sc.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.clear();
            buffer.put("Hello\n".getBytes());
            buffer.put("World\n".getBytes());
            buffer.put("Java".getBytes());
            buffer.flip();
            while(buffer.hasRemaining()) {
                sc.write(buffer);
            }
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.2. 选择器(Selector)

Selector 一般称为选择器 ,当然你也可以翻译为多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现一个单独的线程来监控多个注册在它上面的信道(Channel),通过一定的选择机制,实现多路复用的效果。

使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

 image.png

2.2.1. Selector使用方法

1、 通过 open() 方法创建 Selector

Selector selector = Selector.open();

2、 创建一个通道

ServerSocketChannel channel = ServerSocketChannel.open();

3、 将通道设置为非阻塞模式

channel.configureBlocking(false);

注:如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的,

因为 Channel 必须要是非阻塞的, 因此 FileChannel 是不能够使用选择器的, 因为 FileChannel 都是阻塞的。

4、 通过 register() 方法注册通道

channel.register(selector, SelectionKey.OP_ACCEPT);

第二个参数指定了我们对 Channel 的什么类型的事件感兴趣, 这些事件有:

方法

描述

 

SelectionKey.OP_READ

读操作

SelectionKey.OP_WRITE

写操作

SelectionKey.OP_CONNECT

连接Socket操作

SelectionKey.OP_ACCEPT

接收Socket操作


可以使用或运算|来组合多个事件, 例如:

channel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);

5、 通过 select() 方法从多个通道中以轮询的方式选择已经准备就绪的通道, select()方法返回的int值表示有多少通道已经就绪。

       int nReady = selector.select();

       通过 Selector 的 selectedKeys() 方法获得已选择键集(selected-key set)

       Set key = selector.selectedKeys();

6、 通过 Iterator 迭代器依次获取 key 中的 SelectionKey 对象,并通过 SelectionKey 中的判断方法执行对应的操作

while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//判断是否可接收Socket连接
if(selectionKey.isAcceptable()) {
        
            }
//判断是否可与远程服务器建立连接
            if (selectionKey.isConnectable()) {
    
            }
// 判断是否可读取
            if (selectionKey.isReadable()) {
         
    }
//判断是否可写入
            if (selectionKey.isWritable()) {
         
    }
}

2.2.2. 完整示例

服务端(BlockingChannelServer):

public class BlockingChannelServer {
  public static void main(String[] args)
  {
          try
     {
            ServerSocketChannel  ssc=ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(12345));
        //设置为非阻塞模式
            ssc.configureBlocking(false);
            System.out.println("启动服务器....");
            //创建 Selector
            Selector selector = Selector.open();
            //通过 register() 方法注册通道
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            //一直等待来自客户端的请求
            while (true) {
                int nReady = selector.select();
                if(nReady == 0) continue;
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    if (key.isAcceptable()) {
                      //若接受的事件是“接收就绪” 操作,就获取客户端连接
                        SocketChannel socketChannel = ssc.accept();
                        //切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将该通道注册到selector选择器上
                       socketChannel.register(selector,SelectionKey.OP_READ);                               
                    }
                    else if (key.isReadable()) {
                         //获取该选择器上的“读就绪”状态的通道
                        SocketChannel socketChannel=
                        (SocketChannel)key.channel();
                        //读取数据
                             ByteBuffer buffer = ByteBuffer.allocate(1024);
                         while (socketChannel.read(buffer) != -1) {
                              buffer.flip();
                              byte[] bytes = new byte[buffer.remaining()];
                              buffer.get(bytes);
                              System.out.println(new String (bytes));
                              buffer.clear();
                         }
 
                         socketChannel.close();
                    }
                    ssc.close();
                }
                }
      }
         catch(IOException e)
         {
              e.printStackTrace();
          }
      }
}

客户端(BlockingChannelClient):

public class BlockingChannelClient {
    public static void main(String[] args) {
        try {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("127.0.0.1", 12346));
            //设置为非阻塞模式
            sc.configureBlocking(false);
            
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            writeBuffer.clear();
            writeBuffer.put("Hello\n".getBytes());
            writeBuffer.put("World\n".getBytes());
            writeBuffer.put("Java".getBytes());
            writeBuffer.flip();
            
            while(writeBuffer.hasRemaining()) {
                sc.write(writeBuffer);
            }
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

0条评论

点击登录参与评论