| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 | <?phpnamespace app\push\controller;use think\Db;use think\worker\Server;use Workerman\Lib\Timer;class Worker extends Server{    protected $socket = 'websocket://0.0.0.0:2346';    protected $snConnections = [];    protected $heartbeat_time = '55';    /**     * 收到信息     * @param $connection     * @param $data     */    public function onMessage($connection, $datas)    {        $connection->lastMessageTime = time();        $data = json_decode($datas);        if (empty($data->uid)) {            $connection->close();            return;        }        $uid = 1;//这里的uid根据自己的情况去验证        if (empty($uid)) {            $connection->close();            return;        }        switch ($data->type) {            case 'login':                // 保存该用户的输送数据                $this->uidConnections[$uid] = $connection;                // $connection->send('发送成功');                break;            case 'send':                // 发送消息                // $this->sendMessageByUid($uid, $datas);                break;        }    }    /**     * 当连接建立时触发的回调函数     * @param $connection     */    public function onConnect($connection)    {        $connection->send('链接成功');    }    /**     * 当连接断开时触发的回调函数     * @param $connection     */    public function onClose($connection)    {        if(isset($connection->uid))        {            // 连接断开时删除映射            unset($this->uidConnections[$connection->uid]);        }    }    /**     * 当客户端的连接上发生错误时触发     * @param $connection     * @param $code     * @param $msg     */    public function onError($connection, $code, $msg)    {        echo "error $code $msg\n";    }    /**     * 每个进程启动     * @param $worker     */    public function onWorkerStart($worker)    {        // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符        $inner_text_worker = new \Workerman\Worker('text://0.0.0.0:2347');        // $inner_text_worker->reusePort=true;        $inner_text_worker->onMessage = function ($connection, $buffer) {            // $data数组格式,里面有uid,表示向那个uid的页面推送数据            $data = json_decode($buffer, true);            $uid = $data['uid'];            // 通过workerman,向uid的页面推送数据            $ret = $this->sendMessageByUid($uid, $buffer);            // 返回推送结果            $connection->send($ret ? 'ok' : 'fail');        };        // ## 执行监听 ##        $inner_text_worker->listen();        Timer::add(10, function()use($worker){            $time_now = time();            foreach($worker->connections as $connection) {                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间                if (empty($connection->lastMessageTime)) {                    $connection->lastMessageTime = $time_now;                    continue;                }                 $diff_time = $time_now - $connection->lastMessageTime;                 $msg = '距离上次通话已经过去'.$diff_time.'秒';                 $connection->send($msg);                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接                if ($time_now - $connection->lastMessageTime > $this->heartbeat_time) {                    $connection->close();                }            }        });    }    // 向所有验证的用户推送数据    public function broadcast($message)    {        foreach($this->uidConnections as $connection)        {            $connection->send($message);        }    }    // 针对uid推送数据    public function sendMessageByUid($uid, $message)    {        if(isset($this->uidConnections[$uid]))        {            $connection = $this->uidConnections[$uid];            $connection->send($message);            return true;        }        return false;    }}
 |