当前位置: 资讯 >

Swoole - webSocket消息服务系统方案设计篇

来源:腾讯云 发表日期:2023-03-09 04:55:25

概述

基于Swoole的websocket服务,计划整合3篇进行技术整理,该服务主要有2个核心业务,用户消息服务(消息计数统计)和 客服IM消息系统服务,这篇先说用户消息服务是怎么设计实现的。

实现方案

用户消息服务主要有2部分组成,对外使用webSocket长链接服务提供给安卓/Ios手机客户端,web提供服务,对内使用Http服务。


(相关资料图)

鉴权和缓存周期设置

当服务端携带Token来访问请求webSocket服务,进行用户中心进行权限验证,如果权限通过,在本地进行信息缓存,返回给请求端,为了防止缓存雪崩(雪崩就是指缓存同一时间到期),用户访问峰值是晚间21-24点这个时间段,峰值大概100w/请求,持续4个小时左右,但因为用户中心的缓存时间为7300s,所以这里的过期时间公式:

$uid = $redis->get($token);$expireTime = 3650 + rand(1, 3000);            $uid = OAuth::getUserInfo($token);if (!empty($uid) && intval($uid) > 0) {    //存入缓存时间,过期时间小于 7300s    $redis->setEx($token, $expireTime, $uid);}if($uid && $uid > 0){    $key = "token_".$uid;    $redis->setEx($key, $expireTime, $token);}

本地服务的缓存怎么存储,具体看自己的业务情况,适合自己的就是最好的。

Http服务

Http服务的安全依赖于服务只针对云服务器内网访问,主站有服务变更时,异步埋点在功能里,比如有系统消息、评论、站内信等一系列操作的时候,会通过http请求用户消息服务,设置超时时间,允许丢失部分消息。

2.1 业务埋点处理

埋点再操作后异步触发,超时时间2秒,如果失败再进行一次重试,如果失败,其实基本就是服务挂了,局域网处理,性能传输成本几乎为0,这个地方相当于消息的生产方。

public function swooleComment($uid, $data){    $url = $this->swooleUrl . "/api/comment/message";    $commentUid = empty($data["comment_uid"]) ? 0 :  $data["comment_uid"];    $msg = [        "uid" => $uid,        "msg" => json_encode(["comment_uid" => $commentUid])    ];    $res = Curl::posturl($url, http_build_query($msg), $this->_headerQArr, 2);    if ($res === false) {        // 请求失败再重试一次        usleep(100000);        $res = Curl::posturl($url, http_build_query($msg), $this->_headerQArr, 2);    }    return $res;}

2.2 消息处理

Swoole有一个缺点就是如果没有建立websocket服务,就不能实时进行通信,所以这个地方我分两步处理,根据消息类型进行管理和消息的推送,存入redis list结构的队列中,使用Crontab,执行定时脚本处理。

设计方案为快慢2条双队列结构,快队列主要处理当前最新的消息,如果用户超过1天不上线,放入延迟队列执行,用户超过超过15天未登录,消息释放。

websocket的心跳时间是300s,所以crontab 4min,执行一次,延迟队列6分钟执行一次,我们的redis使用的是链接池单节点特点,整个服务都在依赖,所以这样设计的方案。

2.3 数据存储

数据使用Mysql存储,Uid进行分表取模,采用分表的初衷是因为当时已经有300w+的用户,消息多,所以采用分表设计,所有的操作依赖于uid这个变量,所有的操作都采用TaskManager异步操作,以保证最大的性能。

protected function _getTableName(int $uid): string{    $tableIndex = intval($uid % 128);    return "user_push_msg_" . $tableIndex;}
protected function addAsyncMysql( array $pushMsg,  int $uid): ?bool{    $tableName = $this->_getTableName($uid);    if (empty($pushMsg) || empty($tableName) || empty($uid)) return false;    TaskManager::getInstance()->async(function () use ($pushMsg, $tableName) {        DbManager::getInstance()->invoke(function (ClientInterface $client)        use ($pushMsg, $tableName) {            $model = PushMsgModel::invoke($client, $pushMsg);            $model->tableName($tableName)->save();        }, self::MYSQL_CONN_NAME);    });}

用户消息数统计

在业务中有全体用户,全体作者,签约作者等分组的情况,成为统计中的重点和难点,一共分分2步解决。

第一步,在http消息接收端专门放置一个消息计数器对用户单条发送的消息进行计数,只统计针对用户的消息。

第二步,新建一个mysql表,专门用于统计用户最近查看消息的时间戳,根据用户最后的查看消息时间来统计群组中的未读消息数,把两个结果进行相加,得出用户未读消息数和。

表的设计用uid做主键,保持用户的唯一性,使用REPLACE INTO进行更新,REPLACE INTO的好处是如果主键uid存在,更新时间,如果不存在则新增数据。

CREATE TABLE `table` (  `uid` int(10) unsigned NOT NULL DEFAULT "0",  `unixtime` int(10) unsigned NOT NULL DEFAULT "0",  PRIMARY KEY (`uid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT="用户查看消息最新时间"
x
推荐阅读 更多