[Java IO] - Gukie/learning GitHub Wiki

refer

basic

  • connection, 一个连接,可以理解为一个 session;
  • socket, 连接的一个endpoint,即一个口子; 连接关闭了,socket不一定关闭了; socket关闭了,连接一定关闭了;
  • 一个socket,可以处理多个connection; 所以在AIO的server中,可以设置最大连接数

服务器端 vs 客户端

各个OP的值

  • OP_READ, 1, 001
  • OP_WRITE, 4, 100
  • OP_CONNECT,8,1000
  • OP_ACCEPT,16,10000

服务器端

  1. 由于可能需要处理多个连接,所以需要一个选择器,即一个selector
  2. 创建一个 ServerSocketChannel,并将其注册到 selector中,并会返回一个SelectionKey ServerSocketChannel,它只能用于ACCEPT 来自客户端的连接,因为其 validOps()是16,所以其interestOps也只能是16,所有改变其interestOps的行为,都将会被拒绝
  3. 使用 selector.select() 监听 客户端过来的连接, 该方法会一直阻塞,直到有连接过来 当有连接过来是,创建一个新的 SocketChannel,并注册到 selector中去; 下次遍历时,就会将该SocketChannel上的事件给处理出来; 需要注意的是,新的 SocketChannel的 validOps的值为13,即: 1101; 所以只能注册 READ,Write,CONNECT,不能注册ACCEPT

关于SelectionKey

当有新的 IO事件发生时, selector.select()将返回, 然后调用 selector.selectedKeys() 获取到可用的 selectionKey,最后遍历每一个 SelectionKey进行处理

注意,每当处理完一个 selectionKey之后,一般都会将其remove掉; 因为在selector.select()这个过程中,会做:

  • 1.如果该 key对应的operation在操作系统层面是ready了的
  • 1.1. 而该key不在 selected-key set中,那么该key会被加到 selected-key set中; 同时 该key会加入到ready-key set中,旧的ready-key 中的ready信息将会被丢弃
  • 1.2. 如果该 key不在selected-key set中,那么该key所在的channel的 ready-key set将会被修改让 该key对应的operation是true的,比如 isWritable,isReadable; 但其旧的read-key中的ready信息将会保留

the channel's key is already in the selected-key set, so its ready-operation set is modified to identify any new operations for which the channel is reported to be ready

  • 2.isXXXable的方法,都是需要用到readyOps(),而readOps()有跟上一步的selector.select()密切相关; 所以都是在 selector.select()之后再对每一个SelectionKey进行check 某个operation是否可用,比如是否可读(isReadable); 一般都是这么像以下那么写的,比如 isAcceptable
 public final boolean isAcceptable() {
	return (readyOps() & OP_ACCEPT) != 0;
}
  • 3.一般处理SelectionKey的写法如下(也是服务器Socket对收到的IO进行处理的方式)
		while (true) {
            
            selector.select();

            Set<SelectionKey> selectionKeySet = selector.selectedKeys();           
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // ready to accept new Connection
                if (selectionKey.isAcceptable()) {
                    ...
                }

                if (selectionKey.isReadable()) {
                   ...
                }
                if (selectionKey.isWritable()) {
                   ...
                }
                iterator.remove();//即使remove掉了,下次selector.select()的时候,当前的 SelectionKey如果还在的化,还是会加进来,继续处理; 所以不用担心它会丢失掉
            }


        }

客户端

由于客户端,不需要多路复用,所以不需要使用Selector,直接创建一个 SocketChannel,就可以直接用了,简单的示例如下:

  InetSocketAddress socketAddress = new InetSocketAddress("localhost", 8888);
  SocketChannel channel = SocketChannel.open(socketAddress);
  channel.write(ByteBuffer.wrap("hello world".getBytes("utf-8")));

ByteBuffer 处理中文乱码

从Channel读取数据的时候,是从channel中取 byteBuffer能接收的字节数,即: remaining() 数量 这就很可能会将中文截取错误;

这可以通过 Decoder来解决:

  • decoder 一次只从ByteBuffer中读取 decoder能解析出来的字节数;
  • 我们可以将 ByteBuffer多余的字节数,存起来,在下一次从Channel中读取的时候先将它存放进去
  • 这样,每一次decoder都会只解析可以解析的字节数,并且不会遗漏

具体code如下:

// decoder
        Charset charset = Charset.forName("UTF-8");
        CharsetDecoder decoder = charset.newDecoder();
		
		SocketChannel connection = (SocketChannel) selectionKey.channel();

		int bufferCapacity = 3;
		ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity);
		CharBuffer charBuffer = CharBuffer.allocate(bufferCapacity);

		int readNum = connection.read(byteBuffer);
		while(readNum> 0){
			byteBuffer.flip();
//          decoder.decode(byteBuffer,charBuffer,readNum<bufferCapacity);// 这时候,msgBuf的position会被设置为: msgBuf的原来的position+能被decode到的byte的长度(可能有的byge不能够decode,此时 msgBuf的position就不会等于msgBuf的limit)
			decoder.decode(byteBuffer,charBuffer,true);// 这时候,msgBuf的position会被设置为: msgBuf的原来的position+能被decode到的byte的长度(可能有的byge不能够decode,此时 msgBuf的position就不会等于msgBuf的limit)
			charBuffer.flip();
			System.out.print(charBuffer.toString());

			// 将没有解码的数据,存起来;
			int notReadCount = byteBuffer.remaining();
			byte[] notReadByte = null;
			if(notReadCount>0){
				notReadByte = new byte[notReadCount];
				byteBuffer.get(notReadByte);
			}

			byteBuffer.clear();
			charBuffer.clear();

			//如果有多余的字节没有被处理,应该放进来,以便下一次被处理
			if(notReadByte!=null){
				byteBuffer.put(notReadByte);
			}
			readNum = connection.read(byteBuffer);
		}


Selector.select() method

该方法,会阻塞,直到有一个Channel可以进行I/O了; 注意,serverChannel,是不参与计算的. 即: 如果客户端的 Channel不ready了,那么 selector.select() 会一直阻塞

以下两端code,前一个是 selector.select(100), 后一个是 selector.select(); 执行的结果,会发现前置在达到100ms的时候,会往下执行; 而后者会一直阻塞直到有客户端的channel进行IO了

		while(true){
            System.out.println("*********************** start Selector.select() selecting...");
            // 4. Selector.select() to wait until there is Channel ready for I/O
            selector.select(100);

            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            if(selectionKeySet == null || selectionKeySet.isEmpty()){
                System.out.println("***** selectionKeySet is empty...");
            }
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while(iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                // ready to accept new Connection
                if(selectionKey.isAcceptable()){
                    System.out.println("ready for accept new connection...");
                    SocketChannel newConn = serverSocketChannel.accept();
                    if(newConn == null){
                        continue;
                    }
                    newConn.configureBlocking(false);
                    SelectionKey newSK = newConn.register(selector,SelectionKey.OP_READ);
                    printSelectionkey("accept new connection: ",newSK);

                }else if(selectionKey.isReadable()){
                    System.out.println("ready for read...");
					...                  

                }else if(selectionKey.isWritable()){
                    System.out.println("ready for writing...");
					...
                }

            }


        }
			
		while(true){
            System.out.println("*********************** start Selector.select() selecting...");
            // 4. Selector.select() to wait until there is Channel ready for I/O
            selector.select();

            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            if(selectionKeySet == null || selectionKeySet.isEmpty()){
                System.out.println("***** selectionKeySet is empty...");
            }
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while(iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                // ready to accept new Connection
                if(selectionKey.isAcceptable()){
                    System.out.println("ready for accept new connection...");
                    SocketChannel newConn = serverSocketChannel.accept();
                    if(newConn == null){
                        continue;
                    }
                    newConn.configureBlocking(false);
                    SelectionKey newSK = newConn.register(selector,SelectionKey.OP_READ);
                    printSelectionkey("accept new connection: ",newSK);

                }else if(selectionKey.isReadable()){
                    System.out.println("ready for read...");
					...                  

                }else if(selectionKey.isWritable()){
                    System.out.println("ready for writing...");
					...
                }

            }


        }

flush

  • refer

同步写机制,异步写机制,延迟写机制

  • 同步写机制: 数据会立即写到磁盘中,应用程序会一直等,知道数据全部写入磁盘
  • 延迟写机制: 数据会写到 内核缓存(叶缓存,Page cache)完成后,应用程序就返回了(这种方式,数据可能没有正真的写入到磁盘)
  • 异步写机制: 跟延迟写不同的是,该方式在数据真正写入到磁盘后,会通知应用程序

Buffered IO vs Direct IO

  • Buffered IO: 所有的 IO 读写都经过 内核缓存,这是默认的工作方式
  • UnBuffered IO: IO读写,直接跟磁盘打交道,绕过 Operation System Cache;

User Buffered IO (中文名也叫 流缓存)

这个层面的缓存,是用户层面的; 既可以通过应用程序显示的加上,也可以通过操作系统提供的类库隐式的加上

Example:
	每一次写的内容很少,比如一个字节,那么写10个字节:
	没有使用user-buffered-IO: 每一次调用,都会调用操作系统内核的write(), 这样,调用10次系统 write() 方法
	使用了user-buffered-IO: 假设设置的流缓存是5字节, 那么只会调用 write()两次

注意:
	用户缓存只缓存应用程序这一边的数据,控制调用 OS底层IO的次数; 并不能控制这次OS底层IO是否使用缓存,或者是否成功;

Java中的IO

  • write的时候,如果使用了缓存,将数据写到自己的缓存中;
  • flush的时候,将缓存中的数据写到磁盘中去(这里也只是调用操作系统的write方法);具体有没有真正写进去,不能保证,因为它采用的是延迟写机制; flush方法的注解如下:

If the intended destination of this stream is an abstraction provided by the underlying operating system, for example a file, then flushing the stream guarantees only that bytes previously written to the stream are passed to the operating system for writing; it does not guarantee that they are actually written to a physical device such as a disk drive

  • BufferedWriter/BufferedReader/BufferedOutputStream/BufferedInputStream 都是使用了缓存的(即流缓存), 所以在使用的时候,一般也建议使用它们; 其他的 Writer/Reader/OutputStream/InputStream 可能要看它们具体的实现,但大部分都没有使用缓存

  • 字节流的IO类 与 字符流的IO类的互转

  • 字节流 转为 字符流: InputStreamReader (这个可以解决字节流输出时,中文乱码问题)
  • 字符流 转为 字节流: OutputStreamWriter (写字符的时候,最后需要转为字节写到文件中)

多个线程,同时操作一个流/(字节或者字符的)Java IO在write


BIO, NIO, AIO

  • refer
  1. BIO 是同步阻塞IO
  2. NIO 是同步非阻塞
  3. AIO 是异步非阻塞
一个IO过程
  1. 程序发出一个IO请求 (一个System call 命令)
  2. OS系统收到请求后,做两个步骤: 2.1. 准备数据 (waiting for the data to be ready) 2.2. 数据准备好后,将数据从内核copy到用户进程中 (copy data from kernel to user)
  3. OS都完成后,返回成功给用户进程,此时用户进程就可以处理数据了
  • 是否同步 是看"数据从kernel copy到用户进程"这一步,进程是否阻塞; 如果阻塞,就是同步,如果不是,则是异步
  • 是否阻塞 是看"waiting for data to be ready"这一步,进程是否阻塞;
Summary
  • BIO:全程都是阻塞的,所以是同步阻塞IO

  • NIO:
    1. 数据准备阶段,是非阻塞的; 这个阶段,系统会通过 SelectionKey 去轮询数据是否ready了
    2. 数据copy阶段,是同步的; 即channel的read/write操作,会一致阻塞,知道数据处理完成
    3. 所以NIO是同步非阻塞IO

  • AIO:进程发送一个 system call之后,就可以处理其他的事情; 当 OS系统将数据准备好,并且copy到用户进程完成后,会通知用户进程; 全程都不会有阻塞,所以AIO是异步非阻塞IO