Worker.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. <?php
  2. namespace app\push\controller;
  3. use think\Db;
  4. use think\worker\Server;
  5. use Workerman\Lib\Timer;
  6. class Worker extends Server
  7. {
  8. protected $socket = 'websocket://0.0.0.0:2346';
  9. protected $snConnections = [];
  10. protected $heartbeat_time = '55';
  11. /**
  12. * 收到信息
  13. * @param $connection
  14. * @param $data
  15. */
  16. public function onMessage($connection, $datas)
  17. {
  18. $connection->lastMessageTime = time();
  19. $data = json_decode($datas);
  20. if (empty($data->uid)) {
  21. $connection->close();
  22. return;
  23. }
  24. $uid = 1;//这里的uid根据自己的情况去验证
  25. if (empty($uid)) {
  26. $connection->close();
  27. return;
  28. }
  29. switch ($data->type) {
  30. case 'login':
  31. // 保存该用户的输送数据
  32. $this->uidConnections[$uid] = $connection;
  33. // $connection->send('发送成功');
  34. break;
  35. case 'send':
  36. // 发送消息
  37. // $this->sendMessageByUid($uid, $datas);
  38. break;
  39. }
  40. }
  41. /**
  42. * 当连接建立时触发的回调函数
  43. * @param $connection
  44. */
  45. public function onConnect($connection)
  46. {
  47. $connection->send('链接成功');
  48. }
  49. /**
  50. * 当连接断开时触发的回调函数
  51. * @param $connection
  52. */
  53. public function onClose($connection)
  54. {
  55. if(isset($connection->uid))
  56. {
  57. // 连接断开时删除映射
  58. unset($this->uidConnections[$connection->uid]);
  59. }
  60. }
  61. /**
  62. * 当客户端的连接上发生错误时触发
  63. * @param $connection
  64. * @param $code
  65. * @param $msg
  66. */
  67. public function onError($connection, $code, $msg)
  68. {
  69. echo "error $code $msg\n";
  70. }
  71. /**
  72. * 每个进程启动
  73. * @param $worker
  74. */
  75. public function onWorkerStart($worker)
  76. {
  77. // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
  78. $inner_text_worker = new \Workerman\Worker('text://0.0.0.0:2347');
  79. // $inner_text_worker->reusePort=true;
  80. $inner_text_worker->onMessage = function ($connection, $buffer) {
  81. // $data数组格式,里面有uid,表示向那个uid的页面推送数据
  82. $data = json_decode($buffer, true);
  83. $uid = $data['uid'];
  84. // 通过workerman,向uid的页面推送数据
  85. $ret = $this->sendMessageByUid($uid, $buffer);
  86. // 返回推送结果
  87. $connection->send($ret ? 'ok' : 'fail');
  88. };
  89. // ## 执行监听 ##
  90. $inner_text_worker->listen();
  91. Timer::add(10, function()use($worker){
  92. $time_now = time();
  93. foreach($worker->connections as $connection) {
  94. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  95. if (empty($connection->lastMessageTime)) {
  96. $connection->lastMessageTime = $time_now;
  97. continue;
  98. }
  99. $diff_time = $time_now - $connection->lastMessageTime;
  100. $msg = '距离上次通话已经过去'.$diff_time.'秒';
  101. $connection->send($msg);
  102. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  103. if ($time_now - $connection->lastMessageTime > $this->heartbeat_time) {
  104. $connection->close();
  105. }
  106. }
  107. });
  108. }
  109. // 向所有验证的用户推送数据
  110. public function broadcast($message)
  111. {
  112. foreach($this->uidConnections as $connection)
  113. {
  114. $connection->send($message);
  115. }
  116. }
  117. // 针对uid推送数据
  118. public function sendMessageByUid($uid, $message)
  119. {
  120. if(isset($this->uidConnections[$uid]))
  121. {
  122. $connection = $this->uidConnections[$uid];
  123. $connection->send($message);
  124. return true;
  125. }
  126. return false;
  127. }
  128. }