123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- <?php
- namespace app\push\controller;
- use think\Db;
- use think\worker\Server;
- use Workerman\Lib\Timer;
- class Worker extends Server
- {
- protected $socket = 'websocket://0.0.0.0:2346';
- protected $snConnections = [];
- protected $heartbeat_time = '55';
- /**
- * 收到信息
- * @param $connection
- * @param $data
- */
- public function onMessage($connection, $datas)
- {
- $connection->lastMessageTime = time();
- $data = json_decode($datas);
- if (empty($data->uid)) {
- $connection->close();
- return;
- }
- $uid = 1;//这里的uid根据自己的情况去验证
- if (empty($uid)) {
- $connection->close();
- return;
- }
- switch ($data->type) {
- case 'login':
- // 保存该用户的输送数据
- $this->uidConnections[$uid] = $connection;
- // $connection->send('发送成功');
- break;
- case 'send':
- // 发送消息
- // $this->sendMessageByUid($uid, $datas);
- break;
- }
- }
- /**
- * 当连接建立时触发的回调函数
- * @param $connection
- */
- public function onConnect($connection)
- {
- $connection->send('链接成功');
- }
- /**
- * 当连接断开时触发的回调函数
- * @param $connection
- */
- public function onClose($connection)
- {
- if(isset($connection->uid))
- {
- // 连接断开时删除映射
- unset($this->uidConnections[$connection->uid]);
- }
- }
- /**
- * 当客户端的连接上发生错误时触发
- * @param $connection
- * @param $code
- * @param $msg
- */
- public function onError($connection, $code, $msg)
- {
- echo "error $code $msg\n";
- }
- /**
- * 每个进程启动
- * @param $worker
- */
- public function onWorkerStart($worker)
- {
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
- $inner_text_worker = new \Workerman\Worker('text://0.0.0.0:2347');
- // $inner_text_worker->reusePort=true;
- $inner_text_worker->onMessage = function ($connection, $buffer) {
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
- $data = json_decode($buffer, true);
- $uid = $data['uid'];
- // 通过workerman,向uid的页面推送数据
- $ret = $this->sendMessageByUid($uid, $buffer);
- // 返回推送结果
- $connection->send($ret ? 'ok' : 'fail');
- };
- // ## 执行监听 ##
- $inner_text_worker->listen();
- Timer::add(10, function()use($worker){
- $time_now = time();
- foreach($worker->connections as $connection) {
- // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
- if (empty($connection->lastMessageTime)) {
- $connection->lastMessageTime = $time_now;
- continue;
- }
- $diff_time = $time_now - $connection->lastMessageTime;
- $msg = '距离上次通话已经过去'.$diff_time.'秒';
- $connection->send($msg);
- // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
- if ($time_now - $connection->lastMessageTime > $this->heartbeat_time) {
- $connection->close();
- }
- }
- });
- }
- // 向所有验证的用户推送数据
- public function broadcast($message)
- {
- foreach($this->uidConnections as $connection)
- {
- $connection->send($message);
- }
- }
- // 针对uid推送数据
- public function sendMessageByUid($uid, $message)
- {
- if(isset($this->uidConnections[$uid]))
- {
- $connection = $this->uidConnections[$uid];
- $connection->send($message);
- return true;
- }
- return false;
- }
- }
|