java-NIO

java-NIO

起男 130 2024-03-07

java-NIO

  1. java nio全称java non-blocking io,是指jdk提供的api。从jdk1.4开始,java提供了一系列改进的输入/输出的新特性,被统称为nio(即new io),是同步非阻塞的
  2. nio相关的类都被放在Java.nio包及其子包下
  3. nio有三大核心部分:channel(通道)、buffer(缓冲区)、selector(选择器)
  4. bio是面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  5. java nio的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这两个线程同时可以做别的事情
  6. 通俗理解:nio是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞io那样,非得分配10000个
  7. http2使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比http1.1大了好几个数量级

和bio的区别:

  1. bio以流的方式处理数据,而nio以块的方式处理数据,块io的效率比流io高很多
  2. bio是阻塞的,nio是非阻塞的
  3. bio基于字节流和字符流进行操作,而nio基于channel(通道)和buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

缓冲区(buffer)

buffer本质上是一个可以读写数据的内存块,可以理解成一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。channel提供从文件、网络读取数据的通道,但是读取或写入的数据都必须经由buffer

案例:

        //创建一个buffer,大小为5,可以存放5个int
        IntBuffer intBuffer = IntBuffer.allocate(5);
        //向buffer中存放数据
        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i*2);
        }
        //从buffer读取数据
        intBuffer.flip();//读写切换
        intBuffer.position(1);//从下标为1开始读
        intBuffer.limit(3);//读到下标为3的位置
        while (intBuffer.hasRemaining()){
            System.out.println(intBuffer.get());
        }
        intBuffer.clear();//清除缓冲区(并未真正清除,只是初始化标记状态)
        System.out.println(Arrays.toString(intBuffer.array()));//返回底层数组

Buffer内部的4个核心属性:

  • mark:标记
  • position:位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变该指,为下一次读写做准备
  • limit:缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
  • capacity:容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变

只读Buffer

        ByteBuffer byteBuffer = ByteBuffer.allocate(64);
        for (int i = 0; i < 64; i++) {
            byteBuffer.put((byte)i);
        }
        //反转
        byteBuffer.flip();
        //得到一个只读的buffer(java.nio.HeapByteBufferR)
        ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
        //读取
        while (readOnlyBuffer.hasRemaining()){
            System.out.println(readOnlyBuffer.get());
        }
        //readOnlyBuffer.put((byte) 100);//抛出ReadOnlyBufferException

MappedByBuffer

可以让文件直接在内存(堆外内存)中进行修改。这样操作系统不需要拷贝一次

案例:修改文件内容

        RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\test\\file01.txt", "rw");
        //获取通道
        FileChannel channel = randomAccessFile.getChannel();
        //参数1:读写模式
        //参数2:可以直接修改的起始位置,
        //参数3:映射到内存的大小(将文件的多少个字节映射到内存)可以直接修改的内存范围就是0-5
        MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);

        mappedByteBuffer.put(0,(byte)'a');//把下标为0的位置改为a
        mappedByteBuffer.put(3,(byte)'b');//把下标为3的位置改为b
        //mappedByteBuffer.put(5,(byte)'c');//抛出异常IndexOutOfBoundsException,因为上边设置了最大数量5

        randomAccessFile.close();

Scattering和Gathering

Scattering(分散)将数据写入到buffer时,可以采用buffer数组,依次写入

Gathering(聚集)从buffer读取数据时,也可以采用buffer数组,依次读

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
        //绑定端口 到socket 并启动
        serverSocketChannel.socket().bind(inetSocketAddress);
        //创建buffer数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(5);
        byteBuffers[1] = ByteBuffer.allocate(3);
        //等待客户端连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        int messageLength = 8;//从客户端接受8字节
        //循环读取
        while (true){
            int byteRead = 0;
            while (byteRead < messageLength){
                long r = socketChannel.read(byteBuffers);
                byteRead += r;//累计读取的字节数
                System.out.println("读:"+byteRead);
                Arrays.asList(byteBuffers).stream()
                        .map(byteBuffer -> "postion="+byteBuffer.position()
                                + ",limit="+byteBuffer.limit())
                        .forEach(System.out::println);
            }

            //将所有buffer反转
            Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.flip());
            //读出数据
            long byteWrite = 0;
            while (byteWrite < messageLength){
                long w = socketChannel.write(byteBuffers);
                byteWrite += w;
            }
            //将所有buffer复位
            Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.clear());
            System.out.println("byteRead="+byteRead + " byteWrite="+byteWrite);

注意:buffer支持类型化的put和get操作,但需要按照顺序进行读取,否则有可能出现异常

     ByteBuffer byteBuffer = ByteBuffer.allocate(64);
     //类型化方式放入数据
     byteBuffer.putInt(100);
     byteBuffer.putLong(9L);
     byteBuffer.putChar('丁');
     byteBuffer.putShort((short) 4);

     //取出
     byteBuffer.flip();
     System.out.println(byteBuffer.getInt());
     System.out.println(byteBuffer.getLong());
     System.out.println(byteBuffer.getChar());
     System.out.println(byteBuffer.getShort());

通道(channel)

  1. bio中的steam是单向的,nio中的channel是双向的
  2. channel在nio中是一个接口
  3. 常用的Channel类有:FileChannel(文件的数据读写)、DatagramChannel(udp的数据读写)、ServerSocketChannel和SocketChannel(tcp的数据读写)

nio的通道类似于流,但有些区别:

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲读数据,也可以写数据到缓冲

案例1,将数据写入到文件

        String str = "hello,dqn";
        //创建一个输出流
        FileOutputStream fos = new FileOutputStream("D:\\test\\file01.txt");
        //通过输出流获取对应的FileChannel(真实类型是FileChannelImpl)
        FileChannel channel = fos.getChannel();
        //创建一个缓冲区ByteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //将数据放入到缓冲区
        byteBuffer.put(str.getBytes());
        //对buffer进行反转
        byteBuffer.flip();
        //将缓冲区的数据写入通道
        channel.write(byteBuffer);
        //关闭流
        fos.close();

案例2,将文件里的数据读入控制台

        //创建文件输入流
        File file = new File("D:\\test\\file01.txt");
        FileInputStream fis = new FileInputStream(file);
        //通过输入流对象 获取对应的FileChannel
        FileChannel channel = fis.getChannel();
        //创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
        //将通道的数据读入到缓冲区中
        channel.read(byteBuffer);
        //将缓冲区的字节转为字符串
        System.out.println(new String(byteBuffer.array()));
        //关闭流
        fis.close();

案例3,文件拷贝1

        FileInputStream fis = new FileInputStream("D:\\test\\file01.txt");
        FileChannel channel01 = fis.getChannel();

        FileOutputStream fos = new FileOutputStream("D:\\test\\file02.txt");
        FileChannel channel02 = fos.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        //循环读取
        while (true){
            byteBuffer.clear();//清空缓冲区(复位)
            int read = channel01.read(byteBuffer);
            //读取结束
            if (read == -1){
                break;
            }
            //将缓冲区中的数据写入到通道2
            byteBuffer.flip();//反转
            channel02.write(byteBuffer);
        }
        //关闭流
        fos.close();
        fis.close();

案例4,文件拷贝2

        FileInputStream fis = new FileInputStream("D:\\test\\file01.txt");
        FileChannel sourceChannel = fis.getChannel();
        FileOutputStream fos = new FileOutputStream("D:\\test\\file02.txt");
        FileChannel destChannel = fos.getChannel();

        //拷贝
        destChannel.transferFrom(sourceChannel,0,sourceChannel.size());

        //关闭流
        sourceChannel.close();
        destChannel.close();
        fis.close();
        fos.close();

ServerSocketChannel

ServerSocketChannel在服务器端监听新的客户端socket连接

常用方法

  • open:得到一个ServerSocketChannel通道
  • bind:设置服务端是端口号
  • configureBlocking:设置阻塞或非阻塞模式,false表示非阻塞
  • accept:接受一个连接,返回这个连接对应的通道对象
  • register:注册一个选择器并设置监听事件

SocketChannel

SocketChannel,网络io通道,具体负责进行读写操作,nio把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区

常用方法

  • open:得到一个SocketChannel对象
  • configureBlocking:设置阻塞或非阻塞模式,false表示非阻塞
  • connect:连接服务器
  • finishConnect:如果连接失败,则通过该方法完成连接操作
  • write:往通道里写数据
  • read:从通道里读数据
  • register:注册一个选择器并设置监听事件
  • close:关闭通道

选择器(selector)

  • java的nio,用非阻塞的io方式。可以用一个线程,处理多个客户端连接,这时就需要使用selector
  • selector能够检测多个注册的通道上是否有事件发生(多个channel以事件的方式可以注册到同一个selector),如果有事件发生,便获取事件然后对每个事件进行相应的处理,这样就可以只用一个单线程去管理多个通道
  • 只有在通道有读写事件发生时,才会进行读写,就大大的减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,避免了多线程之间的上下文切换开销

常用方法

  • open:得到一个选择器
  • select:监控所有注册的通道,当其中有io操作可以进行时,将对应的SelectionKey加入到内部集合中并返回(可以设置超时时间)
  • selectNow:不阻塞,立即返回
  • selectKeys:从内部集合中得到所有有事件发生的SelectionKey
  • keys:从内部集合中得到所有的SelectionKey
  • wakeup:唤醒selector

案例

服务端

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到selector对象
        Selector selector = Selector.open();
        //绑定一个端口,在服务器端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //把ServerSocketChannel注册到Selector,并设置关系事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //循环等待客户端连接
        while (true){
            //等待1秒
            if (selector.select(1000)==0){//没有事件发生
                System.out.println("服务器等待了1秒,无连接");
                continue;
            }
            //如果>0,就获取相关的集合(有事件发生的SelectionKey)
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //遍历
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {//如果是OP_ACCEPT事件(有新的客户端连接)
                    //给该客户端生成一个SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端连接成功");
                    socketChannel.configureBlocking(false);//设置为非阻塞的
                    //将当前SocketChannel也注册到Selector,设置关注事件,
                    // 同时给该socketChannel关联一个buffer
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if (selectionKey.isReadable()){//如果是OP_READ事件(读事件)
                    System.out.println("发送读事件");
                    //通过selectionKey得到socketChannel
                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                    //获取到该SocketChannel关联的buffer
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    channel.read(byteBuffer);
                    System.out.println("客户端数据:"+new String(byteBuffer.array()));
                }
                //手动从集合中移除当前的SelectionKey,防止重复操作
                iterator.remove();
            }
        }

客户端

        //获得网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器id和端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务
        if (!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("连接需要时间,客户端不阻塞,可以做其他工作");
            }
        }
        //连接成功就发数据
        String s = "hello,dqn";
        //将字节数组包装到缓冲区中
        ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes());
        //发送数据
        socketChannel.write(byteBuffer);
        System.in.read();

SelectionKey

SelectionKey表示Selector和网络通道的注册关系

关系

  • OP_READ:值为1,读
  • OP_WRITE:值为4,写
  • OP_CONNECT:值为8,连接已经建立
  • OP_ACCEPT:值为16,有新的网络连接

常用方法

  • selector:得到与之关联的Selector对象
  • channel:得到与之关联的channel对象
  • attachment:得到与之关联的共享数据
  • interestOps:设置或改变监听事件,参数是要改变的新事件
  • isAcceptable:是否可以accept
  • isReadable:是否可以读
  • isWritable:是否可以写

零拷贝

零拷贝是网络编程的关键,很多性能优化都离不开

在java程序中,常用的零拷贝有mmap和sendFile

  • mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数
  • sendFile
    • Linux2.1版本提供了sendFile函数,其基本原理是:数据根本不经过用户态,直接从内核缓冲区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换
    • Linux2.4版本中,做了一些修改,避免了从内核缓冲区拷贝到SocketBuffer的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝

mmap和sendFile的区别:

  • mmap适合小数据量读写,sendFile适合大文件传输
  • mmap需要4次上下文切换,3次数据拷贝;sendFile需要3次上下文切换,最少2次数据拷贝
  • sendFile可以利用DMA方式,减少cpu拷贝,mmap则不能(必须从内核拷贝到socket缓冲区)

我们所说的零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有kernel buffer有一份数据)

零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,列入更少的上下文切换,更少的cpu缓冲伪共享以及无cpu校验和计算

传统传输方式

server:

        ServerSocket serverSocket = new ServerSocket(7001);

        while (true){
            Socket socket = serverSocket.accept();
            DataInputStream dis = new DataInputStream(socket.getInputStream());
            try {
                byte[] bytes = new byte[4096];
                while (true){
                    int read = dis.read(bytes, 0, bytes.length);
                    if (read == -1){
                        break;
                    }
                }
            }catch (Exception e){

            }
        }

client:

        Socket socket = new Socket("localhost", 7001);
        String fileName = "D:\\test\\file01.txt";
        InputStream is = new FileInputStream(fileName);
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
        byte[] bytes = new byte[4096];
        long readCount;
        long total = 0;
        long startTime = System.currentTimeMillis();

        while ((readCount = is.read(bytes)) >= 0){
            total += readCount;
            dos.write(bytes);
        }
        System.out.println("发送总字节数:"+total+",耗时:"+
                (System.currentTimeMillis()-startTime));

零拷贝传输方式

server:

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(7001));

        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

        while (true){
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readCount = 0;
            while (readCount != -1){
                readCount = socketChannel.read(byteBuffer);
                byteBuffer.rewind();//倒带,position=0,mark作废
            }
        }

client:

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",7001));
        String fileName = "D:\\test\\file01.txt";

        FileChannel fileChannel = new FileInputStream(fileName).getChannel();
        long startTime = System.currentTimeMillis();
        //在Linux下一个transferTo就可以完成传输
        //在Windows下一次transferTo只能发送8m,所以需要分段传输
        long count = fileChannel.transferTo(0, fileChannel.size(), socketChannel);//底层使用零拷贝
        System.out.println("发送的总的字节数="+count+",耗时:"+(System.currentTimeMillis()-startTime));
        fileChannel.close();

AIO

jdk7引入了asynchronous I/O,即AIO,在进行I/O编程中,常用到两种模式:reactor和proactor。java的nio就是reactor,当有事件触发时,服务器端得到通知,进行相应处理

AIO即NIO2.0,叫做异步非阻塞IO,AIO引入异步通道的概念,采用了proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后,才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

模型比较

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量