+ 1 年間 前
コミット
c079af5743

+ 16 - 0
application/api/controller/script/Index.php

@@ -49,4 +49,20 @@ class Index extends Api
             $this->error($e->getMessage());
         }
     }
+
+    // 获取任务状态
+    public function getTaskStatus()
+    {
+        try {
+            $number = $this->request->post('number');
+            if (!$number) $this->error('编号不能为空');
+            $row = Device::where('number', $number)->find();
+            if ($row) {
+                $this->success('获取成功', ['status' => $row['status']]);
+            }
+            $this->error('获取失败');
+        } catch (Exception $e) {
+            $this->error($e->getMessage());
+        }
+    }
 }

+ 138 - 0
application/push/controller/Worker.php

@@ -0,0 +1,138 @@
+<?php
+
+namespace app\push\controller;
+
+use think\Db;
+use think\worker\Server;
+use Workerman\Lib\Timer;
+
+class Worker extends Server
+{
+    protected $socket = 'websocket://0.0.0.0:2346';
+    protected $snConnections = [];
+    protected $heartbeat_time = '55';
+
+    /**
+     * 收到信息
+     * @param $connection
+     * @param $data
+     */
+    public function onMessage($connection, $datas)
+    {
+        $connection->lastMessageTime = time();
+        $data = json_decode($datas);
+        if (empty($data->uid)) {
+            $connection->close();
+            return;
+        }
+        $uid = 1;//这里的uid根据自己的情况去验证
+        if (empty($uid)) {
+            $connection->close();
+            return;
+        }
+        switch ($data->type) {
+            case 'login':
+                // 保存该用户的输送数据
+                $this->uidConnections[$uid] = $connection;
+                // $connection->send('发送成功');
+                break;
+            case 'send':
+                // 发送消息
+                // $this->sendMessageByUid($uid, $datas);
+                break;
+        }
+    }
+
+    /**
+     * 当连接建立时触发的回调函数
+     * @param $connection
+     */
+    public function onConnect($connection)
+    {
+        $connection->send('链接成功');
+    }
+
+    /**
+     * 当连接断开时触发的回调函数
+     * @param $connection
+     */
+    public function onClose($connection)
+    {
+        if(isset($connection->uid))
+        {
+            // 连接断开时删除映射
+            unset($this->uidConnections[$connection->uid]);
+        }
+    }
+
+    /**
+     * 当客户端的连接上发生错误时触发
+     * @param $connection
+     * @param $code
+     * @param $msg
+     */
+    public function onError($connection, $code, $msg)
+    {
+        echo "error $code $msg\n";
+    }
+
+    /**
+     * 每个进程启动
+     * @param $worker
+     */
+    public function onWorkerStart($worker)
+    {
+        // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
+        $inner_text_worker = new \Workerman\Worker('text://0.0.0.0:2347');
+        // $inner_text_worker->reusePort=true;
+        $inner_text_worker->onMessage = function ($connection, $buffer) {
+            // $data数组格式,里面有uid,表示向那个uid的页面推送数据
+            $data = json_decode($buffer, true);
+            $uid = $data['uid'];
+            // 通过workerman,向uid的页面推送数据
+            $ret = $this->sendMessageByUid($uid, $buffer);
+            // 返回推送结果
+            $connection->send($ret ? 'ok' : 'fail');
+        };
+        // ## 执行监听 ##
+        $inner_text_worker->listen();
+        Timer::add(10, function()use($worker){
+            $time_now = time();
+            foreach($worker->connections as $connection) {
+                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
+                if (empty($connection->lastMessageTime)) {
+                    $connection->lastMessageTime = $time_now;
+                    continue;
+                }
+                 $diff_time = $time_now - $connection->lastMessageTime;
+                 $msg = '距离上次通话已经过去'.$diff_time.'秒';
+                 $connection->send($msg);
+                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
+                if ($time_now - $connection->lastMessageTime > $this->heartbeat_time) {
+                    $connection->close();
+                }
+            }
+        });
+    }
+
+    // 向所有验证的用户推送数据
+    public function broadcast($message)
+    {
+        foreach($this->uidConnections as $connection)
+        {
+            $connection->send($message);
+        }
+    }
+
+    // 针对uid推送数据
+    public function sendMessageByUid($uid, $message)
+    {
+        if(isset($this->uidConnections[$uid]))
+        {
+            $connection = $this->uidConnections[$uid];
+            $connection->send($message);
+            return true;
+        }
+        return false;
+    }
+}

+ 3 - 1
composer.json

@@ -31,7 +31,9 @@
         "ext-pdo": "*",
         "ext-bcmath": "*",
         "txthinking/mailer": "^2.0",
-        "php-mqtt/client": "^0.3.0"
+        "php-mqtt/client": "^0.3.0",
+        "topthink/think-worker": "^1.0",
+        "workerman/workerman-for-win": "^3.5"
     },
     "config": {
         "preferred-install": "dist",

+ 6 - 0
server.php

@@ -0,0 +1,6 @@
+#!/usr/bin/env php
+<?php
+define('APP_PATH', __DIR__ . '/application/');
+define('BIND_MODULE','push/Worker');
+// 加载框架引导文件
+require __DIR__ . '/thinkphp/start.php';