IT虾米网

Netty解惑详解

admin 2018年06月08日 架构师 1473 0

一、Netty

1. Netty的线程模型

netty通过Reactor模型基于多路复用器接收并处理用户请求,这个Reactor模型分为三种:
第一种是Reactor单线程模型,它是使用一个线程来处理客户端的连接和IO处理
这里写图片描述
第二种是Reactor多线程模型,他使用一个Acceptor线程来处理客户端的连接,并使用线程池来处理Handler的IO操作
这里写图片描述
第三种是Reactor主从多线程模型,这种模型的Acceptor和Handler处理都是线程池
这里写图片描述

Netty使用的是第二种模型,即使你这样写:(boosGroup表示Acceptor)

EventLoopGroup bossGroup = new NioEventLoopGroup(4); 

但是由于服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费.
这里写图片描述

2. Netty的执行过程

首先我们想想Nio的启动流程:

        // 1. 打开服务端 Socket 
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
 
        // 2. 打开 Selector 
        Selector selector = Selector.open(); 
 
        // 3. 服务端 Socket 监听8080端口, 并配置为非阻塞模式 
        serverSocketChannel.socket().bind(new InetSocketAddress(8080)); 
        serverSocketChannel.configureBlocking(false); 
 
        // 将 channel 注册到 selector 中. 
        // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 
        // 4. Channel注册到 Selector 中. 
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
 
        // 5. 不断循环 
        while (true) { 
            //迭代selectedkey 
        }

第一步、打开服务端 ServerSocketChannel
第二步、通过 Selector.open() 打开一个 Selector.
第三步、绑定端口
第四步、将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set),并且监听事件为SelectionKey.OP_ACCEPT
第五步、循环,不断重复:

  • 调用 select() 方法
  • 调用 selector.selectedKeys()
  • 获取 selected keys
  • 迭代每个 selected key:
    1) 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
    2) 判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 “SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()” 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
    3) 根据需要更改 selected key 的监听事件.
    4) 将已经处理过的 key 从 selected keys 集合中删除.

然后理一理Netty服务端的启动流程:

        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        EventLoopGroup workGroup = new NioEventLoopGroup(); 
        ServerBootstrap bs = new ServerBootstrap(); 
        bs.group(bossGroup, workGroup) 
                .channel(NioServerSocketChannel.class) 
                .option(ChannelOption.SO_BACKLOG, 1000) 
                .option(ChannelOption.SO_KEEPALIVE, true) 
                .childHandler(new ChannelInitializer<SocketChannel>() { 
                    @Override 
                    protected void initChannel(SocketChannel channel) throws Exception { 
                        ChannelPipeline p = channel.pipeline(); 
                        p.addLast(new ObjectEncoder()); 
                        p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); 
                    } 
                }); 
        bs.bind(8001).sync();

对于这个启动代码,.group、.channel、.option、.handler这些方法都是把参数传入到Bootstrap,实际上使用是在client调用connect和Server调用doBind方法时才会使用:

就拿connect来说说做了什么事

1、 new NioEventLoopGroup()
  1)实例化EventLoopGroup,在构造方法中会一直调用父构造方法,最终在MultithreadEventExecutorGroup中会实例化EventLoopGroup,在这个构造方法内部,首先会创建一个EventExecutor数组

this.children = new SingleThreadEventExecutor[nThreads]; 

  2)创建一个EventExecutor数组并初始化,大小为构造时new NioEventLoopGroup(4),如果不指定默认为处理器核心数*2 ,这个EventExecutor 其实是一个Executor,他用于执行Runnable

然后会调用newChild初始化EventExecutor数组

this.children[terminationListener] = this.newChild(threadFactory, args); 

  3)newChiild方法实际上是初始化一个NioEventLoop

protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { 
    return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0]); 
} 

  4)NioEventLoop的构造方法,调用了openSelector方法,打开了一个selector——对应NIO的第二步

  NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { 
    super(parent, threadFactory, false); 
    if(selectorProvider == null) { 
        throw new NullPointerException("selectorProvider"); 
    } else { 
        this.provider = selectorProvider; 
        this.selector = this.openSelector(); 
    } 
} 

可见一个new NioEventLoopGroup() 实际上是初始化了一个NioEventLoop数组,并且在实例化NioEventLoop时调用了openSelector方法,打开了一个selector

2、然后根据.channel方法传入的类型,通过工厂方法设计模式实例化一个Channel,并且打开一个SocketChannel(对应NIO的第一步)

private static SocketChannel newSocket(SelectorProvider provider) { 
    ... 
    return provider.openSocketChannel(); 
} 

3、然后根据.option方法传入的参数,初始化channel,给channel设置各种参数

4、然后调用group().register(channel)进行channel注册到selector中

5、上一步的注册,最终会到达AbstractNioChannel.doRegister方法,该方法通过

javaChannel().register(eventLoop().selector, 0, this)  

将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
注意这里把channel注册到selector中,interestOps居然是0,根据NIO,一个连接注册到Selector的Channel,监听事件应该是SelectionKey.OP_ACCEPT,下一步会说到

6、调用了doDegister方法注册成功后,会触发ChannelRegistered事件,ChannelRegistered事件会在ChannelPipelines中传递,从HeadHandler传到TailHandler中,传递完成之后注册成功!
(5、6完成了Channel在Selector的注册,对应NIO的第四步)

ps:当有客户端连接进来时,会怎样?

  • 首先判断ServerSocketChannel监听是否成功,如果成功,触发ChannelActive事件,该事件也会在ChannelPipelines中传递
  • 然后根据配置是否需要触发ChannelRead事件,触发了HeadHandler的read方法,最后再服务端链路注册成功之后,重新将操作位设置为SelectionKey.OP_ACCEPT

7、执行bind方法,绑定端口,(3,4,5,6)这四步,实际上都会在第7步执行之后才执行,bind方法最终会调用到doBind0方法,绑定端口(这一步对应NIO的第三步)

此时,已经完成了NIO的1,2,3,4步,还剩第5步,在一个循环中,不断重复,迭代selectorkey

8、循环在哪里?
复杂点说:
因为NioEventLoop是一个SingleThreadEventExecutor,一个NioEventLoop会和一个线程绑定,NioEventLoop中有一个run方法。
所以EventLoop的启动,实际上就是EventLoop启动线程。在SingleThreadEventExecutor的execute方法中,有一个startThread方法,这个方法就是用来启动EventLoop的。
当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动,最终会调用到NioEventLoop的run方法

简单点说
当执行bind方法,会导致NioEventLoop的启动(也就是Executor的启动),NioEventLoop和一个线程绑定,所以NioEventLoop的启动会调用到自身的run方法

run方法逻辑:

 @Override 
    protected void run() { 
    for (;;) { 
        boolean oldWakenUp = wakenUp.getAndSet(false); 
        try { 
            if (hasTasks()) { 
                selectNow();//判断是否又IO事件就绪 
            } else { 
                select(oldWakenUp);//判断是否有IO事件就绪 
                if (wakenUp.get()) { 
                    selector.wakeup(); 
                } 
            } 
 
            cancelledKeys = 0; 
            needsToSelectAgain = false; 
            final int ioRatio = this.ioRatio; 
            if (ioRatio == 100) { 
                processSelectedKeys();//执行selectkey的read write操作 
                runAllTasks(); 
            } else { 
                final long ioStartTime = System.nanoTime(); 
 
                processSelectedKeys(); 
 
                final long ioTime = System.nanoTime() - ioStartTime; 
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 
            } 
 
            if (isShuttingDown()) { 
                closeAll(); 
                if (confirmShutdown()) { 
                    break; 
                } 
            } 
        } catch (Throwable t) { 
            ... 
        } 
    } 
    }

processSelectedKeys中就是处理事件的地方,read,write,connect等,它内部是通过UnSafe类来实现的,就如NIO对read,write等事件的处理,而例如read,wirte方法,Unsafe最终会调用它的父类AbstractNioByteChannel的read方,write方法,最终在这里会完成数据的读写,并且通过调用 pipeline.fireChannelRead 发送一个 inbound 事件,给其他的channel处理
(这一步对应NIO的第五步)

Netty的执行流程和NIO的执行流程联系完毕

总结:

1、一个EventLoopGroup 内部 维护了一个EventLoop数组,EventLoop实际上是一个Executor,大小为实例化时传入或者是默认的处理器核心数*2,Executor中是用来执行Runnable的
2、一个EventLoop中有一个 Selector 属性,SocketChannel会注册到EventLoop的 Selector 中去
3、一个客户端链接,会使用一个EventLoop来去处理
3、注册事件,读事件,写事件都会在ChannelPipeline中传递,从HeadHandler到TailHandler
4、一个NioEventLoop会和一个线程联系,Channel的IO操作,如监听,读,写是在EventLoop中run方法完成的,而run方法内部是通过Unsafe类完成的,而UnSafe类最终又会调用父类AbstractNioByteChannel来处理读写连接的事件,并且会把该事件在pipeline中传递

4. 客户端的连接服务端

Bootstrap调用connect后,最终会来到下面的方法

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 
    if(localAddress != null) { 
        this.javaChannel().socket().bind(localAddress);//绑定地址 
    } 
 
    boolean success = false; 
 
    boolean var5; 
    try { 
        boolean connected = this.javaChannel().connect(remoteAddress); 
        if(!connected) { 
            this.selectionKey().interestOps(8); 
        } 
 
        success = true; 
        var5 = connected; 
    } finally { 
        if(!success) { 
            this.doClose(); 
        } 
 
    } 
 
    return var5; 
} 

最终会调用NioSocketChannel的doConnect方法,而该方法内部调用:

boolean connected = javaChannel().connect(remoteAddress); 

javaChannel()会返回java.nio.channels.SocketChannel,SocketChannel调用connect方法,跟Nio的一样,也就是说,Netty的客户端连接最终还是执行Nio的连接

三、Netty知识点

前言:NioEventLoop的作用

第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理,客户端的连接等;
(后面的分析主要是这个作用)

第二个是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

NioEventLoop和一个线程联系,并且内部有一个消息队列,在启动NioEventLoop时,调用run方法,会首先去处理消息队列的任务,然后再去select就绪的IO事件

1. Netty服务端的启动过程

1、首先实例化NioEventLoopGroup,最终会初始化一个NioEventLoop数组,NioEventLoop是一个Executor,每一个NioEventLoop和一个线程联系。

2、当调用doBind绑定端口后,会进行一系列的初始化操作,例如初始化pipeline和channel的option,并且会把ServerSocketChannel注册到NioEventLoop的selector上,并且此时的instrestOps是SelectionKey.OP_ACCEPT,此时的ServerSocketChannel是处于监听状态。

3、然后会取出一个NioEventLoop,启动NioEventLoop,会在run方法中,在循环中监听事件(Accept,read,write)

2. Accept过程

这里写图片描述
1、当客户端进行connect后,服务端的BossGroup启动的NioEventLoop会捕获到SelectionKey.OP_ACCEPT事件

2、 然后会调用NioMessageUnsafe的read方法来处理连接事件,然后会取出连接的客户端的NioSocketChannel,然后触发ChannelActive事件,该事件也会在ChannelPipelines中传递,然后再触发客户端NioSocketChannel和服务端NioServerSocketChannel的ChannelRead操作

3、然后服务端的ServerBootStrap会为客户端的NioSocketChannel设置childHandler参数

4、紧接着会把取得客户端的NioSocketChannel通过childGroup.register(child)将NioSocketChannel注册到work的NioEventLoop中,这个过程和NioServerSocketChannel注册到boss的NioEventLoop的过程一样,最终交付给work线程对应的selector进行read事件的监听。

3. Read操作

这里写图片描述

同样是在NioEventLoop中进行的,当work线程的selector检测到OP_READ事件发生时,触发ChannelRead操作,read操作完成后,该事件会在Pipeline中传递下去,给Pipeline的Handler依次处理

4. epoll bug (Selector 空轮询)

当NioEventLoop中select方法内,selector的轮询结果为空,消息队列也没有新的消息要处理,说明是一个空轮询,有可能会导致jdk的epoll bug,会导致Selector的空轮询,使IO线程一直处于100%状态

解决:
通过重建Selector的方式,首先当前reBuildSelector是否是其他线程发出的,如果是,则放入消息队列中,由NioEventLoop的线程负责调用。然后打开新的Selector,通过循环,将老的Selector注册的Channel重新出则到新的Selector中,并关闭老的Selector

5. TCP粘包拆包问题

原因:

  1. 应用程序write写入的字节大小 > TCP发送窗口缓冲区大小
  2. TCP分片
  3. IP分片

MSS:TCP数据包每次能够传输的最大数据大小
MTU:数据链路层每一次传输的最大传输单元

TCP分片:当前发送的数据包 > MSS
IP分片:当前IP层发送的分组 > MTU

解决:

1、消息定长,每个报文指定固定的字节
例如FixedLengthFrameDecoder,它是固定长度解码器,可以根据执行的长度来编解码,如果收到的报文长度不足指定的长度,会先缓存等到下一个消息来进行拼包,直到达到报文长度

2、使用一些分隔符,例如以\n或\r\n来分割(LineBasedFrameDecoder),或者可以自定义分隔符(DelimiterBasedFrameDecoder)

3、将消息分为消息头和消息体,消息头包含表示消息总长度

4、其他更复杂的协议

6. ChannelOption

1、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小

2、ChannelOption.SO_KEEPALIVE
是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。

3、ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF
这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

4、ChannelOption.TCP_NODELAY
在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
TCP_NODELAY就是用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false。

5、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。
比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

Spring Cloud Feign常见问题详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。