基于netty的聊天架构设计
cookqq ›博客列表 ›netty

基于netty的聊天架构设计

2015-10-27 09:18:55.0|分类: netty|浏览量: 7491

摘要: netty即时通信服务端开发的全过程:链接合法性验证,解码和编码,心跳和超时检测,netty线程模型,客户端的断连重连机制,消息的缓存重发

摘要

 1、Netty中boss线程池大小为1,worker线程池大小为8, new NioEventLoopGroup(8),其余线程分配给业务使用。

 2、由于超时时间过长,100W个长链接链路会创建100w长连接对象(比如channel/IdlChannelHandlerAdapter/ScheduledFutureTask ...),每个对象还保存有业务的成员变量,非常消耗内存。一些定时任务被老化到持久代中,没有被JVM垃圾回收掉,内存一直在增长,用户误认为存在内存泄露。 所以说一些长时间没有通信的链接需要关闭掉。 定时心跳显得很重要,如果300s没有收到任何信息,则把链接关闭掉。

 3、不要在Netty的I/O线程上处理业务(心跳发送和检测除外)。为什么呢??

 因为Java进程,线程不能无限增长。这就意味着Netty的Reactor线程数必须收敛。但是在实际业务处理中,经常会有一些额外的复杂逻辑处理,例如性能统计、记录接口日志等,这些业务操作性能开销也比较大,如果在I/O线程上直接做业务逻辑处理,可能会阻塞I/O线程,影响对其它链路的读写操作,或者导致一些链路的关闭或者打开比较慢。

 4、写日志的时候小心

 在生产环境中,需要实时打印接口日志,其它日志处于ERROR级别,当推送服务发生I/O异常之后,会记录异常日志。如果当前磁盘的WIO比较高,可能会发生写日志文件操作被同步阻塞,阻塞时间无法预测。这就会导致Netty的NioEventLoop线程被阻塞,Socket链路无法被及时关闭、其它的链路也无法进行读写操作等

 5、-Xmx:JVM最大内存需要根据内存模型进行计算并得出相对合理的值;


关键词


推送信息状况

    信息阅读的方式比较多,例如:pc、手机、ipad、穿戴设备等等

    现在使用网络质量并不是稳定的,网络主要是运营商的无线移动网,例如在地铁上、山区、沙漠信号就很差,容易发生网络闪断;

    海量的终端链接,而且通常使用长连接,服务器维持大量长连接,资源消耗都非常大;

    谷歌开发的推送框架无法再国内使用,android的应用必须自己

    由于谷歌的推送框架无法在国内使用,Android的长连接是由每个应用各自维护的,这就意味着每台安卓设备上会存在多个长连接。即便没有消息需要推送,长连接本身的心跳消息量也是非常巨大的,这就会导致流量和耗电量的增加;

    不稳定:消息丢失、重复推送、延迟送达、过期推送时有发生;

    垃圾消息到处是。


netty框架

    

netty介绍

    Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

netty优点

    

    统一的API,适用于不同的协议(阻塞和非阻塞),比如http/tcp/udp

    基于灵活、可扩展的事件驱动模型,采用了链式的事件模型

    高度可定制的线程模型

    可靠的无连接数据Socket支持(UDP)

    更好的吞吐量,低延迟

netty架构



平台关键设计

链接合法性验证

    mina客户端和服务器端建立了一个连接,并且为这个链接打开一个长连接。建立一个channel,客户端登陆操作,服务器将连接和账号进行关联。当有一个请求到来的时候,首先检查连接是否合法,这个链接是否登陆,用户是否正确。


解码和编码

    编码它将对象序列化为字节数组,用于网络传输、数据持久化等用途。反之,解码(Decode)/反序列化(deserialization)把从网络、磁盘等读取的字节数组还原成原始对象。

    Netty的逻辑架构图:

从网络读取的inbound消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息,才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的outbound业务消息,需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO框架的有机组成部分,无论是由业务定制扩展实现,还是NIO框架内置编解码能力,该功能是必不可少的。

    netty常用的解码器:

LineBasedFrameDecoder解码器

    LineBasedFrameDecoder是回车换行解码器,如果用户发送的消息以回车换行符作为消息结束的标识。

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

DelimiterBasedFrameDecoder解码器

    DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。

FixedLengthFrameDecoder解码器

    FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包等问题,非常实用。

对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。

LengthFieldBasedFrameDecoder解码器

ObjectEncoder编码器

心跳和超时检测

客户端和服务器建立长连接,服务器端会保存着这个长连接,然后对长连接进行轮询看看是否有新的消息。 当客户端socket在非正常情况家掉线,如: 断网,断电等特殊问题的时候, 客户端的channel对象不会自动关闭,还以为这个链接活跃,继续保持着。当服务器端向客户端推送消息的时候,信息已经发送出去了,但是客户端没有收到。


务端增加对空闲时间处理pipeline.addLast("ping", new IdleStateHandler(60, 15, 13,TimeUnit.SECONDS)) 然后在业务逻辑的Handler里面,重写 userEventTriggered(ChannelHandlerContext ctx, Object evt),如果获取到IdleState.ALL_IDLE则定时向客户端发送心跳包;客户端在业务逻辑的Handler里面,如果接到心跳包,则向服务器发送一个心跳反馈;服务端如果长时间没有接受到客户端的信息,即IdleState.READER_IDLE被触发,则关闭当前的channel。


Netty提供的空闲检测机制分为三种:

1) 读空闲,链路持续时间t没有读取到任何消息;

2) 写空闲,链路持续时间t没有发送任何消息;

3) 读写空闲,链路持续时间t没有接收或者发送任何消息。

Netty的默认读写空闲机制是发生超时异常,关闭连接,但是,我们可以定制它的超时实现机制,以便支持不同的用户场景。


服务器开启流程

步骤1:创建ServerBootstrap实例。ServerBootstrap是Netty服务端的启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过多的底层API打交道,降低用户的开发难度。

我们在创建ServerBootstrap实例时,会惊讶的发现ServerBootstrap只有一个无参的构造函数,作为启动辅助类这让人不可思议,因为它需要与多个其它组件或者类交互。ServerBootstrap构造函数没有参数的根本原因是因为它的参数太多了,而且未来也可能会发生变化,为了解决这个问题,就需要引入Builder模式。《Effective Java》第二版第2条建议遇到多个构造器参数时要考虑用构建器,关于多个参数构造函数的缺点和使用构建器的优点大家可以查阅《Effective Java》,在此不再详述。

步骤2:设置并绑定Reactor线程池。Netty的Reactor线程池是EventLoopGroup,它实际就是EventLoop的数组。EventLoop的职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作由绑定的EventLoop线程run方法驱动,在一个循环体内循环执行。值得说明的是,EventLoop的职责不仅仅是处理网络I/O事件,用户自定义的Task和定时任务Task也统一由EventLoop负责处理,这样线程模型就实现了统一。从调度层面看,也不存在在EventLoop线程中再启动其它类型的线程用于异步执行其它的任务,这样就避免了多线程并发操作和锁竞争,提升了I/O线程的处理和调度性能。

步骤3:设置并绑定服务端Channel。作为NIO服务端,需要创建ServerSocketChannel,Netty对原生的NIO类库进行了封装,对应实现是NioServerSocketChannel。对于用户而言,不需要关心服务端Channel的底层实现细节和工作原理,只需要指定具体使用哪种服务端Channel即可。因此,Netty的ServerBootstrap方法提供了channel方法用于指定服务端Channel的类型。Netty通过工厂类,利用反射创建NioServerSocketChannel对象。由于服务端监听端口往往只需要在系统启动时才会调用,因此反射对性能的影响并不大。相关代码如下所示:


步骤4:链路建立的时候创建并初始化ChannelPipeline。ChannelPipeline并不是NIO服务端必需的,它本质就是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler。网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据ChannelHandler的执行策略调度ChannelHandler的执行。典型的网络事件如下:

    链路注册;

    链路激活;

    链路断开;

    接收到请求消息;

    请求消息接收并处理完毕;

    发送应答消息;

    链路发生异常;

    发生用户自定义事件。

步骤5:初始化ChannelPipeline完成之后,添加并设置ChannelHandler。ChannelHandler是Netty提供给用户定制和扩展的关键接口。利用ChannelHandler用户可以完成大多数的功能定制,例如消息编解码、心跳、安全认证、TSL/SSL认证、流量控制和流量整形等。Netty同时也提供了大量的系统ChannelHandler供用户使用,比较实用的系统ChannelHandler总结如下:

    系统编解码框架-ByteToMessageCodec;

    通用基于长度的半包解码器-LengthFieldBasedFrameDecoder;

    码流日志打印Handler-LoggingHandler;

    SSL安全认证Handler-SslHandler;

    链路空闲检测Handler-IdleStateHandler;

    流量整形Handler-ChannelTrafficShapingHandler;

    Base64编解码-Base64Decoder和Base64Encoder。


步骤6:绑定并启动监听端口。在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将ServerSocketChannel注册到Selector上监听客户端连接,相关代码如下:

 private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }


步骤7:Selector轮询。由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合,相关代码如下:

 private void select() throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis  0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = System.nanoTime();
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }

步骤8:当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法,最终调度并执行ChannelHandler,代码如下:



步骤9:执行Netty系统ChannelHandler和用户添加定制的ChannelHandler。ChannelPipeline根据网络事件的类型,调度并执行ChannelHandler,相关代码如下所示:

public interface ChannelHandler {

    ////////////////////////////////
    // Handler life cycle methods //
    ////////////////////////////////

    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    ///////////////////////////////////
    // Inbound event handler methods //
    ///////////////////////////////////

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    。。。
    }


netty线程模型


一个NioEventLoop聚合了一个多路复用器Selector,因此可以处理成百上千的客户端连接,Netty的处理策略是每当有一个新的客户端接入,则从NioEventLoop线程组中顺序获取一个可用的NioEventLoop,当到达数组上限之后,重新返回到0,通过这种方式,可以基本保证各个NioEventLoop的负载均衡。一个客户端连接只注册到一个NioEventLoop上,这样就避免了多个IO线程去并发操作它。

 @Override
    protected int doReadMessages(List buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

Netty通过串行化设计理念降低了用户的开发难度,提升了处理性能。利用线程组实现了多个串行化线程水平并行执行,线程之间并没有交集,这样既可以充分利用多核提升并行处理能力,同时避免了线程上下文的切换和并发保护带来的额外性能损耗。


Netty是个异步高性能的NIO框架,它并不是个业务运行容器,因此它不需要也不应该提供业务容器和业务线程。合理的设计模式是Netty只负责提供和管理NIO线程,其它的业务层线程模型由用户自己集成,Netty不应该提供此类功能,只要将分层划分清楚,就会更有利于用户集成和扩展。

令人遗憾的是在Netty 3系列版本中,Netty提供了类似Mina异步Filter的ExecutionHandler,它聚合了JDK的线程池java.util.concurrent.Executor,用户异步执行后续的Handler。

ExecutionHandler是为了解决部分用户Handler可能存在执行时间不确定而导致IO线程被意外阻塞或者挂住,从需求合理性角度分析这类需求本身是合理的,但是Netty提供该功能却并不合适。原因总结如下:

1. 它打破了Netty坚持的串行化设计理念,在消息的接收和处理过程中发生了线程切换并引入新的线程池,打破了自身架构坚守的设计原则,实际是一种架构妥协;

2. 潜在的线程并发安全问题,如果异步Handler也操作它前面的用户Handler,而用户Handler又没有进行线程安全保护,这就会导致隐蔽和致命的线程安全问题;

3. 用户开发的复杂性,引入ExecutionHandler,打破了原来的ChannelPipeline串行执行模式,用户需要理解Netty底层的实现细节,关心线程安全等问题,这会导致得不偿失。

鉴于上述原因,Netty的后续版本彻底删除了ExecutionHandler,而且也没有提供类似的相关功能类,把精力聚焦在Netty的IO线程NioEventLoop上,这无疑是一种巨大的进步,Netty重新开始聚焦在IO线程本身,而不是提供用户相关的业务线程模型。


最近做netty项目是否发现,多个handler处理过滤的时候。使用channelHandlerContext.writeAndFlush(packet)发送信息的时候发现,这个handler中的 write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 没有被执行,最后研究发现



Netty线程开发最佳实践

时间可控的简单业务直接在IO线程上处理

如果业务非常简单,执行时间非常短,不需要与外部网元交互、访问数据库和磁盘,不需要等待其它资源,则建议直接在业务ChannelHandler中执行,不需要再启业务的线程或者线程池。避免线程上下文切换,也不存在线程并发问题。


复杂和时间不可控业务建议投递到后端业务线程池统一处理

对于此类业务,不建议直接在业务ChannelHandler中启动线程或者线程池处理,建议将不同的业务统一封装成Task,统一投递到后端的业务线程池中进行处理。过多的业务ChannelHandler会带来开发效率和可维护性问题,不要把Netty当作业务容器,对于大多数复杂的业务产品,仍然需要集成或者开发自己的业务容器,做好和Netty的架构分层。


业务线程避免直接操作ChannelHandler

对于ChannelHandler,IO线程和业务线程都可能会操作,因为业务通常是多线程模型,这样就会存在多线程操作ChannelHandler。为了尽量避免多线程并发问题,建议按照Netty自身的做法,通过将操作封装成独立的Task由NioEventLoop统一执行,而不是业务线程直接操作


故障定制

在大多数场景下,当底层网络发生故障的时候,应该由底层的NIO框架负责释放资源,处理异常等。上层的业务应用不需要关心底层的处理细节。但是,在一些特殊的场景下,用户可能需要感知这些异常,并针对这些异常进行定制处理,例如:


1) 客户端的断连重连机制;


2) 消息的缓存重发;


3) 接口日志中详细记录故障细节;


4) 运维相关功能,例如告警、触发邮件/短信等


Netty的处理策略是发生IO异常,底层的资源由它负责释放,同时将异常堆栈信息以事件的形式通知给上层用户,由用户对异常进行定制。这种处理机制既保证了异常处理的安全性,也向上层提供了灵活的定制能力。



不会丢消息  是这样子的  在前台 主进程在的时候就正常收取消息 长连接  主进程 挂了 比如 推出程序 就是 push进程在后台运行 来了消息 有push 消息通知  如果这两个情况都不满足 也不会丢消息 别人给你发了消息 这个时候消息在服务器  保存着 等你下次再上的时候 connect  会服务器 拉取 这个消息 叫离线消息 总的来说 通过上述三种方式 我们可以确保不会丢包丢消息


测试工具

VisualVM是一个集成多个JDK命令行工具的可视化工具。可以作为Java应用程序性能分析和运行监控的工具。开发人员可以利用它来监控、分析线程信息,浏览内存堆数据。系统管理员可以利用它来监测、控制Java应用程序横跨整个网络的情况。Java应用程序使用人员可以利用它来创建包含所有必要信息的Bug 报告。

实验测试





结束语


参考文献


http://ifeve.com/netty-mina-in-depth-1/

http://www.infoq.com/cn/articles/netty-high-performance/


http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/


http://www.coderli.com/netty-5-new-and-noteworthy/


http://www.infoq.com/cn/articles/netty-server-create




一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.