快资讯丨【Netty源码分析】04 服务端读流程

发布时间:2023-03-29 02:17:00     来源:腾讯云

读流程

客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮询、事件处理是在NioEventLooprun方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()

processSelectedKeys()一直追踪下去,可以看到OP_READ处理逻辑分支:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

可能你会比较奇怪:为什么OP_READOP_ACCEPT都会走这个分支?


(资料图)

OP_ACCEPTNioServerSocketChannel处理的事件,而OP_READNioSocketChannel处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe类型也不一样,一个是:NioMessageUnsafe,另一个是:NioSocketChannelUnsafeNioServerSocketChannel负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。

这里我们是处理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe类型实例。

AbstractNioByteChannel.NioByteUnsafe#read方法代码如下:

public final void read() { final ChannelConfig config = config();    if (shouldBreakReadReady(config)) {     clearReadPending();         return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();    allocHandle.reset(config);    ByteBuf byteBuf = null;    boolean close = false;    try {        do {            // 申请ByteBuf对象            byteBuf = allocHandle.allocate(allocator);            //doReadBytes(byteBuf):将数据读取到ByteBuf中            //lastBytesRead()将读取的字节数设置到lastBytesRead            allocHandle.lastBytesRead(doReadBytes(byteBuf));            if (allocHandle.lastBytesRead() <= 0) {                byteBuf.release();                byteBuf = null;                close = allocHandle.lastBytesRead() < 0;                if (close) {                    readPending = false;                }                break;            }            allocHandle.incMessagesRead(1);            readPending = false;            //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中            pipeline.fireChannelRead(byteBuf);            byteBuf = null;        } while (allocHandle.continueReading());//判断是否继续读取          allocHandle.readComplete();        //触发pipeline channelReadComplete        pipeline.fireChannelReadComplete();        if (close) {            closeOnRead(pipeline);        }    } catch (Throwable t) {        handleReadException(pipeline, byteBuf, t, close, allocHandle);    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:

allocHandle.lastBytesRead(doReadBytes(byteBuf)):调用java api,从channel中读取字节数据到ByteBuf缓存中;pipeline.fireChannelRead(byteBuf):触发pipelinechannelRead事件,并将带有读入数据的ByteBuf通过参数传入;pipeline.fireChannelReadComplete():触发pipelinechannelReadComplete事件;

事件传播

调用pipelinefireChannelRead()就可触发channelRead事件在handler之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。

关键点就在于HandlerContext中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一个是在哪个handler上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler上触发channelRead事件。由于pipeline中的handler是被包装成HandlerContext放入的,所以,可以通过handler()方法找到真正的handler对象进行触发。

比如pipelinefireChannelRead()就是触发headchannelRead事件,如果处理完成需要把事件继续传播给下一个handler,就需要调用ctx.fireChannelRead(msg)方法即可,该方法中通过next属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)这个方法就可以将事件传播到下一个节点上。

pipeline.fireChannelRead(byteBuf)运行完成后会调用pipeline.fireChannelReadComplete()方法,触发channelReadComplete事件,执行机制和channelRead事件一样,就不再赘述。

搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之间的区别了,避免误用。

Pipeline线程模型

上面分析的都是常规模式,没有给handler指定额外线程情况下channelReadchannelReadComplete传播机制,大致如下图:

先触发channelRead事件,按照pipeline中顺序依次触发,当所有handler都触发完后,再触发channelReadComplete事件,按照pipeline中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel注册的NioEventLoop

我们来看下static void invokeChannelRead()这个方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

在执行next.invokeChannelRead(m)方法前有个executor.inEventLoop()判断,判断当前执行线程是不是就是handler执行所需的线程。执行handler方法是不能随便线程都可以去执行的,必须使用handler内部指定的executor线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor线程执行器中执行,如果当前线程和handler执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor的任务队列中让其执行。

executor线程执行器是通过next.executor()方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContextexecutor有值则直接返回;否则返回channel注册的NioEventLoop作为线程执行器。

在添加handler时可以指定一个EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,这样,再把handler包装成HandlerContext过程中会从这个EventGroup根据chooser选取策略获得一个EventLoop赋值给executor

所以,从上面分析,默认情况下handler都是在channel注册的NioEventLoop线程中执行的,除非在addLast添加handloer时特别指定。

下面我们通过一个案例分析下pipeline线程模型,如下,给handler02添加一个额外的线程池:

EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast( "handler01", new OtherTest01());    pipeline.addLast( bizGroup, "handler02", new OtherTest02());    pipeline.addLast( "handler03", new OtherTest03());}

这时,channelReadchannelReadComplete事件触发流程见下图:

channelRead事件执行流程说明:

上下两部分代表两个线程,上面是channel注册的eventLoop,下面是添加handler02指定的eventLoop;首先触发handler01channelRead事件,本身当前线程和handler01是同一个线程,所以,直接调用handler#channelRead()方法;handler01#channelRead()方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()方法,但是handler02执行线程并不是默认的channel的注册线程,而是额外设置的biz线程,需要将调用包装成一个任务提交到biz线程的任务队列taskQueue中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()方法执行完成后,需要执行handler03#channelRead(),它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()的调用封装成一个任务提交到register eventLoop的taskQueue中,待其内部线程提取执行;

下面再来看下channelReadComplete事件执行流程:

上图a1将任务提交给taskQueue任务队列后直接返回了,而不是等其执行完成再返回;a1返回后,从源码分析来看,会立即触发channelReadComplete事件,涉及到线程切换,同理b1这里也是将handler02#channelReadComplete()调用封装成任务放入到biz eventLooptaskQueue中的,然后也直接返回了;这样,biz eventLoop线程执行器taskQueue中就有两个任务,会按照顺序依次执行:先执行channelRead()调用,再执行channelReadComplete()调用;执行a3、b3时同理;

总结

从上面可以看出,Pipelinehandler可以在不同线程间切换得到关键是:taskQueue。还要一点非常重要:handler线程池执行器默认使用的channel注册的NioEventLoop这个,NioEventLoop采用的是单线程工作模式,同时还需要处理selector.select()事件轮询,所以,handler里肯定不能有耗时、特别是IO阻塞等操作,不然卡在handler中,selector#select()执行不到,无法及时接收到客户端传送过来的数据。

标签:

精彩推送

乘用车市场信息联席会发布数据 10月乘用车市场零售为184万辆

11月8日,乘用车市场信息联席会发布数据显示,今年前10个月乘用车市场零售达1671 6万辆,同比增长3%,...

2022-11-09

至正股份发布公告 拟1.19亿元收购苏州桔云51%股权

11月8日晚间,至正股份(603991)发布公告称,公司与SUCCESS FACTORS LIMITED签署了《购买资产协议》,...

2022-11-09

旅游合同暗藏猫腻 中消协点评这些属于不公平条款

11月8日,中国消费者协会官网发布《不公平格式条款点评系列九:旅游领域不公平格式条款点评》,其中针对...

2022-11-09

天键股份IPO排队近11个月 天键股份冲A胜算几何

在同行业企业迪芬尼、豪恩声学IPO告败后,电声产品制造商天键电声股份有限公司(以下简称天键股份)也向A...

2022-11-09

MSCI入摩“三步走” 三年MSCI主题基金不断涌现

自2019年11月8日,美国明晟公司(又称摩根士丹利资本国际公司,MorganStanleyCapitalInternational,以下...

2022-11-09

快资讯丨【Netty源码分析】04 服务端读流程

客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮

当前要闻:定调,房地产税虽迟但到?

定调,房地产税虽迟但到?,房价,税收,楼继伟,房地产税,稳定楼市

今日看点:河岸寻“新”——看传统花木小镇如何新生

成都观察牟峪兴杨柳摄影川观新闻记者吴亚飞杜杰春光明媚,植绿正当时。日前,在成都市温江区和盛镇北林绿道,和盛镇纪委带领着干部群众在金马

世界看热讯:容百科技股东户数下降4.86%,户均持股71.06万元

容百科技最新股东户数2 85万户,低于行业平均水平。公司户均持有流通股份9852股;户均流通市值71 06万元。

世界热点评!拟建学校和市场!揭东区中心区两地块规划调整公示

拟建学校和市场!揭东区中心区两地块规划调整公示,学校,揭东区,控制性,绿地率,住宅用地

越南“胜利方程式”公布!日媒警告:越南出口立国战略有“死角”?

越南“胜利方程式”公布!日媒警告:越南出口立国战略有“死角”?,越南,日本,美国,日媒,方程式,新加坡,东南亚,第二大经济体

焦点速看:银座股份股东户数增加10.95%,户均持股12.98万元

银座股份最新股东户数2 24万户,低于行业平均水平。公司户均持有流通股份2 31万股;户均流通市值12 98万元。

当前要闻:【开拍】起拍楼面价约525元/m²!澄海一宗零售商业用地挂牌!

【开拍】起拍楼面价约525元 m²!澄海一宗零售商业用地挂牌!,楼面价,出让方,汕头市,商业用地,住宅用地,土地使用权

世界观速讯丨大唐发电:2022年净利亏损4.1亿元 同比大幅收窄

证券时报e公司讯,大唐发电(601991)3月28日晚间披露年报,2022年公司实现营业收入1168 28亿元,同比增长12 76%;净利润亏损4 1亿元,去年同期

速看:新洲 | 东湖高新江北投资摘得双柳航天产业城一宗工业用地!

新洲|东湖高新江北投资摘得双柳航天产业城一宗工业用地!,江北,新洲,武汉,东湖高新,工业用地,双柳航天产业城

即时焦点:沈阳楼市新政:优化限购区域,首套房贷利率下调至3.8%

沈阳楼市新政:优化限购区域,首套房贷利率下调至3 8%,套房,非住宅,楼市新政,限购政策,房贷利率,沈阳市房产局

每日速讯:2023年3月份住房公积金工作动态

2023年3月份住房公积金工作动态,磐安县,工作动态,住房公积金,公积金贷款

【全球热闻】【图解年报】新华文轩:2022年归母净利润同比增长6.9%,约为14亿元

新华文轩于2023年3月29日披露年报,公司2022年实现营业总收入109 3亿元,同比增长4 5%;实现归母净利润13 97亿元,同比增长7%;每股收益为1 13元。

【世界报资讯】西部证券易斌:消费复苏和国企改革有望成为市场主线

西部证券易斌:消费复苏和国企改革有望成为市场主线,李莉,主线,易斌,国企改革,西部证券,消费复苏,投资银行,中信证券

聚焦:安能物流2022年实现营收93.35亿元

安能物流28日发布2022年年度业绩报告,公司2022年实现营业收入93 35亿元,经调整EBITDA为10 96亿元。2022年,公司货运总量为1250万吨,较2021

【环球速看料】先签合同再改条款细节?北京知名住宅项目橡树湾“强势”合同引众业主担忧

先签合同再改条款细节?北京知名住宅项目橡树湾“强势”合同引众业主担忧,交房,华润,橡树湾,开发商,商品房,北京市,住宅项目,高层住宅

观焦点:征地拆迁!南京这些地方有新消息了!

征地拆迁!南京这些地方有新消息了!,搬迁,南京,征收,安置房,征地拆迁

实时:资产超600万的家庭,中国有518万户,原来身边藏了这么多有钱人?

资产超600万的家庭,中国有518万户,原来身边藏了这么多有钱人?,存款,中国,中产家庭

今日热搜:【图解年报】光洋股份:2022年归母净利润为-2.3亿元,连亏两年未能扭亏

光洋股份于2023年3月29日披露年报,公司2022年实现营业总收入14 88亿元,同比下降8 3%;实现归母净利润-2 34亿元,上年同期为-8123 8万元,亏损幅度扩大。

重磅!北仑7个片区拟征收土地成片开发,涉及这几个街道→

重磅!北仑7个片区拟征收土地成片开发,涉及这几个街道→,征收,征地,北仑区,专项规划,街道办事处

焦点播报:中新网评:“清朗”行动,让网络空间事更清、理更明、法更透

今年“清朗”系列专项行动聚焦自媒体乱象,集中整治“自媒体”造谣传谣、假冒仿冒、违规营利等乱象,破解“自媒体”信息内容失真、运营行为...

世界视讯!裕田中国(00313)拟收购2家物业管理服务公司

裕田中国(00313)拟收购2家物业管理服务公司,武汉,房地产,裕田中国,物业管理服务

焦点热议:明确了!南通市区这两宗宝藏地,用地性质确定

明确了!南通市区这两宗宝藏地,用地性质确定,南通市,崇川区,宝藏地,住宅用地,田家炳中学

环球观察:定了!泰州这里也要拆迁!

定了!泰州这里也要拆迁!,拆迁,姜堰区,高港区,泰州市,高新区

世界速看:42家房企哄抢!昌平这块地创北京之最!风头盖过新城,未来便宜不了…

42家房企哄抢!昌平这块地创北京之最!风头盖过新城,未来便宜不了…,龙湖,地铁站,朱辛庄,昌平线,北京市

【图解年报】赛微电子:2022年归母净利润由盈转亏,毛利率下降14.4%

赛微电子于2023年3月29日披露年报,公司2022年实现营业总收入7 86亿元,同比下降15 4%;实现归母净利润-7336万元,上年同期为2 1亿元,未能维

农业农村部领导到北海市农产品质量检测中心调研

2023年3月23日上午,农业农村部种植业管理司朱恩林一级巡视员一行6人到北海市农产品质量检测中心进行调研,自治区农业农村厅、北海市政府、北

当前关注:公告发布!扬州这个25年老小区终于要拆迁了!

公告发布!扬州这个25年老小区终于要拆迁了!,拆迁,住宅,搬迁,商铺,征收,扬州市,大运河,中国文物,中国世界遗产,城乡房屋建筑数据

每日快播:屏山县:依托12345热线求助得到工资

近日,屏山县黄女士向12345热线来电反映:2022年12月至2023年2月,黄女士与朋友2人在屏山县书楼镇书楼卫生院工作共8400元工资未得到,希望相关部门能

环球速讯:广元市剑阁县市场监管局聚力“四抓”规范餐饮具清洗消毒

为进一步推动食品安全“两个责任”落地落实,规范餐饮单位餐饮具清洗、消毒工作,广元市剑阁县市场监管局以“春雷行动2023”为抓手,主动作...

焦点!屏山县:依托12345热线求助学校归还物品

近日,屏山县张先生来电向12345热线反映:屏山县屏山镇求实高级中学在学生不知情的情况下把学生放在寝室里的东西拿走,认为不合理,希望学校归

世界热讯:哈尔滨市松北区总价9.3亿元挂牌两宗地 去年曾拟出让但流拍

哈尔滨市松北区总价9 3亿元挂牌两宗地去年曾拟出让但流拍,世茂,流拍,挂牌,松北区,哈尔滨市,商品住房

微速讯:中介自述:心态崩溃后,我经历了行业发展“阵痛”!

中介自述:心态崩溃后,我经历了行业发展“阵痛”!,楼市,中介,房地产

全球快报:天亿马:第二大股东拟减持不超3%公司股份

天亿马3月28日公告,第二大股东深圳乐成持有公司股份445 19万股(占公司总股本比例6 75%)计划自本公告披露之日起3个交易日后的3个月内以大宗交

【全球快播报】屏山县:依托12345热线求助获得占地补偿

近日,屏山县新市镇汉溪村的杨先生向12345热线来电反映:2022年年底,当地政府在新市镇汉溪村修建水池占用了杨先生家1亩左右的土地,但一直未

环球焦点!公积金买房,利弊有哪些?优势很明显,7项不足,基数余额都重要

公积金买房,利弊有哪些?优势很明显,7项不足,基数余额都重要,违约金,商业贷款,公积金买房,公积金贷款,共同还款人

屏山县:依托12345热线咨询退伍军人 生活补贴问题

近日,屏山县中都镇红安村的周先生向12345热线来电反映:2023年周先生的退伍军人生活补贴应该是300元,但是只收到280元,希望部门告知退伍军人

方城县:特色零售终端让乡愁不再遥远

“点关注不迷路!有了这款盒装速食烩面,亲们在何时何地都可以品尝到正宗的方城烩面,家乡的味道伴随我们走遍天涯海角!”近日,一场以“方...

H&M推出全新环保先锋系列“魅力重生故事”,为可持续未来寻求积极变革

近日,H&M推出全新环保先锋系列魅力重生故事,重新演绎上世纪六十年代的时尚风潮。系列聚集时尚与环保,以可持续材料打造的装饰点亮一众单

广元市旺苍县召开制止餐饮浪费暨餐饮具消毒培训会

3月28日,广元旺苍县市场监督管理局组织召开全县制止餐饮浪费暨餐饮具消毒相关标准宣贯培训会。全县100余家餐饮经营单位共计130余人参加培训会

热点评!广元市苍溪县市场监管局扎实开展“科技之春”知识产权宣传

3月28日,广元市苍溪县市场监督管理局结合“科技之春”宣传月开展了知识产权保护与运用宣传活动。活动现场,工作人员通过设置展板、现场讲...

快看点丨成都土地市场升温,去年流拍地今天最高限价拍出

成都土地市场升温,去年流拍地今天最高限价拍出,限价,土拍,二手房,流拍地,成都市,土地市场

环球通讯!不能以抽掉年轻人的脊梁为代价,贷款利率要降,房地产不该在扩张

不能以抽掉年轻人的脊梁为代价,贷款利率要降,房地产不该在扩张,脊梁,按揭,房地产,贷款买房,贷款利率,房贷利率

环球今日报丨广元市剑阁县元山中、小学共同组织开展社会实践活动

在这春暖花开、生机盎然的大好春光里,为了达到“磨砺意志、锤炼团队、挑战自我、亲近自然”的目的,近日,剑阁县元山中、小学共同组织“用...

全球速看:印尼HJF镍铁一期项目第6条产线第一炉炉渣成功打开

【印尼HJF镍铁一期项目第6条产线第一炉炉渣成功打开】据我的钢铁网(Mysteel)调研了解,3月28日,印尼HJF镍铁一期项目(8条RKEF产线)第6条产

热头条丨广元市市场监管局深入旺苍县开展钢筋混土排水管产品质量监督检查工作

3月28日,广元市市场监督管理局产品质量安全监督管理科、产品质量监督检验所针对中央电视广播总台3 15晚会曝光江苏省盐城市滨海县高标准农田建

【速看料】广元市剑阁县柳沟镇:聚焦产业多措并举 促进产业提质增效

“产业兴百业兴”,产业振兴是乡村全面振兴的关键和基础。近年来,剑阁县柳沟镇坚持“产业强镇”思路,多措并举,做实做细产业发展各项工作...

全球报道:某房企今年计划大裁员30%!

某房企今年计划大裁员30%!,裁员,离职,降薪,房地产,大房企

恒大集团最新消息:多地开始陆续交房,保交楼任务取得积极进展!

恒大集团最新消息:多地开始陆续交房,保交楼任务取得积极进展!,交房,保交楼,许家印,恒大集团

【全球聚看点】百年足球名校重庆七中晋级中青赛全国赛

百年足球名校重庆七中晋级中青赛全国赛

精彩推荐