消息中心优化

龙鲲
为什么要优化

消息中心是给数百万骑手和几十万商家推送消息,然而其代码是将以前分散在各个组的推送消息的代码给拷贝了一份,代码没有注释,逻辑复杂混乱,难于理解,代码质量差,性能低下。主要有以下问题:

  1.推送消息缓慢,发送全国消息,要好几个小时
  2.发一次全国消息redis暴增,cpu飚高,人肉告诉运营不要再推全国了,维护人员和运营都比较烦
  3.推送数据在内存中停留时间过久,线上频繁fullgc
  4.有几个接口耗时太大,实现不合理,性能存在严重问题
  5.程序每次上线需要通知运营不要推送消息,以免消息丢失
  6.一个DTO里面有60过个字段,同一个意思,好几个字段都在用,逻辑重复混乱,各种"神逻辑",维护人和对接人都比较头疼
  7.方法和变量命名费解
  8.各种历史bug,以前没发现,现在突然发现了,例如equals判断可能抛出NEP
  9.线上经常慢查

在出了几次问题之后,下定决心要进行大的改造了,毕竟出了问题自己和整个团队都要背锅。

改造过程

从今年年初到现在,我和张佳对消息中心进行了性能,逻辑,方案,监控等方面的持续的优化,当然这之中也接入了许多业务的需求。目前消息中心比较稳定,性能也上去了,推送一百万骑手15分钟左右(还能更快)。

一。性能优化
改造一,去Redis缓存队列

改造之前,消息中心将查询出来的骑手和商家放入redis做缓存队列,然后一个个pop,存入redis用的java原生的序列化方式,占用内存比较多。经常是运营发送两次全国消息,redis内存就不足了。

Redis处理数据是单线程,并且消费redis的时候,还在各个定时任务之间加了一把分布式锁,导致只有一台机器在消费,在发送城市比较多,有几百万骑手的时候,发送完需要三四个小时。

解决方案

采用RocketMQ做缓存队列,RocketMQ是阿里开源的一款高性能高吞吐量的消息中间件,支持海量堆积,消费失败有重试机制(当然需要在业务端做幂等)。

RocketMQ集群模式,每台机器都能做消费者,提高吞吐量。整个消息中心使用一个topic,不同类型的消息使用子tag区分不同的行为。

改造二,消息异步发送

改造之前,消息中心查询骑手,同步查出所有的骑手,然后将这个几百万的大List在内存里按照城市分组,组装推送DTO,看过一次OOM之后dump出来的文件,有三个大的对象,每个对象占用内存25%左右,不频繁fullgc才怪。

虽然查询骑手用了多线程,但是查询耗时还是很大,拖累了整个接口的耗时。有发布任务,刚好这台机器上有发送大量消息,这台机器上的消息就丢失了。

每次定时任务推送将所有的骑手信息按照城市分组后,作为一个ZSet Member序列化后写入到Redis,如果数据结构发生变化不兼容,并且在内存对推送对象作分组等操作,数据量特别大时,性能低

撤销消息,需要先取出ZSet的内容,然后遍历Redis 相应的ZSet删除内容。数据量特别大是造成jvm无响应,对其他接口也有很大影响。

解决方案

采用异步发送的方式,记录一个定时任务,然后扫描定时任务,每次发送消息按照城市推送,分页查询触达对象,城市列表保存到Redis Set,关联到对应的NotifyId。每次发送一页消息,每页500条消息。改完之后再没有出现过OOM,接口耗时也从十几秒到二十几毫秒。

改造三,sql慢查

主要的慢查集中app刷新消息,鹰眼查询页面,强制阅读更新,查询消息已读数和发送总数。

解决方案

app刷新消息主要是通过接受者反查消息,先查询未读的置顶消息,再查询普通消息,需要查询好几次数据库,一条消息对应几万个接受者,非常适合缓存。发送消息的时候同步消息id到Redis key中,然后异步同步消息内容到Redis中。

消息中心消息每个月消息700万左右,加上复杂的查询条件,城市采用json存储,mysql做不到按城市搜索,接入全文搜索引擎ElasticSearch,异步同步消息到ElasticSearch,鹰眼查询从ElasticSearch中查询。

强制阅读需要在几千万的接受者列表中更新一条记录的isforceread字段,这对数据库的压力还是很大的,采用一张扩展表,每个骑手对应一条记录强制阅读的记录,将更新几千万数据的大表换成更新三百万的小表。

已读数和发送总数是在几千万的接受者列表中查询这条的消息的发送总数和已读数,同样对数据库压力比较大。发送总数在发完消息的时候就确定了,放入Redis hash中,已读数每次增加hash field,已读数和未读数的查询放在Redis中。

改造之前有效消息查询分为永久有效消息和有实效的消息,istimelimit = 0 or (istimelimit = 1 and validtimebegin < now() and validtimeend > now(),改为永久有效消息validtimeend为一个很大的时间,简化查询有效消息sql为validtimebegin < now() and validtimeend > now()

二。代码逻辑方案优化

各种发送消息场景的字段都在一个DTO中60多个字段,同一个意思,不同场景用不同的字段,还有很多废弃的字段,基本没有注释,没有文档。方法不区分业务场景的情况下强行复用,定时发送消息,立即发送消息,保存草稿消息都在一个接口中。

还有一些设计上的不合理,发送多城市消息,每个城市一条消息,运营新建一条消息,鹰眼后台显示不只一条消息,不利于运营统计消息触达率。发送消息类别维度不统一,商家,骑手,城市团队中并列了指定发送者。

解决方案

看代码逻辑,删除无用废弃代码,下掉废弃接口,写对接文档方案改造文档,代码加上注释,良好的代码注释真的很重要,字段是否必填,字段什么意思,特殊处理的地方。发送多城市消息改为一条消息,发送总数和已读数放redis存储和查询,避免遍历notify-receiver的8个库。设计上的不合理改造,详细看技术文档

三。监控优化

对主要的接口耗时,调用异常,dubbo线程进行打点监控,在granfa上配置报警信息,出现异常在钉钉群报警。消息发送延迟钉钉报警,rocketMQ堆积报警,骑手大批量延迟报警。记录每条消息的状态流转日志。

其它改造
  • 查询IO,采用批量查询,减少网络开销,多线程查询,减少耗时
  • mysql只选出必要字段,当查询量比较大,并且有大的字段时,对网络的带宽占用还是比较大的。
  • 不同的查询对象对应不同的子类,避免一个DTO中对应字段太多
  • 工厂模式,代理模式,桥接模式等重构原来代码
  • @Cache注解整个对象做缓存的key,其中只有几个字段有效,改为使用其中有效的字段生成key
后续优化
  • 大量发送消息和点对点的消息走不同的RocketMQ topic,避免大量发送的消息阻塞点对点的发送
  • 下掉没有流量的接口
  • 点对点发送消息和鹰眼发送消息的DTO分开,鹰眼发送消息的很多字段app发送消息用不到
  • 发送消息去重