Browse Source

处理任务队列

程旭源 1 year ago
parent
commit
8e36e95a18

+ 56 - 0
application/admin/command/MqttMessage.php

@@ -0,0 +1,56 @@
+<?php
+
+namespace app\admin\command;
+
+use app\admin\services\MqttMessageClient;
+use app\jobs\DeviceReportEvent;
+use \PhpMqtt\Client\MqttClient;
+use \PhpMqtt\Client\ConnectionSettings;
+use think\console\Command;
+use think\console\Input;
+use think\console\input\Option;
+use think\console\Output;
+use think\Env;
+use think\Queue;
+
+class MqttMessage extends Command
+{
+    protected $client = null;
+
+    public function __construct($name=null)
+    {
+        parent::__construct($name);
+        $this->client = MqttMessageClient::getInstance();
+    }
+
+    public function onMessage($topic, $message) {
+        var_dump($topic);
+        $jobHandlerClassName = DeviceReportEvent::class;
+        $jobQueueName  = "DeviceEventQueue";
+        $jobData = [
+            "topic" =>  $topic,
+            "message"   =>  $message
+        ];
+        $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
+        if( $isPushed !== false ){
+            echo '消息已发出';
+        }else{
+            echo '消息发送出错';
+        }
+    }
+    public function configure()
+    {
+        $this
+            ->setName('MqttMessage')
+            ->setDescription('读取MQTT消息');
+    }
+
+    public function execute(Input $input, Output $output)
+    {
+
+        $this->client->subscribe("/device/+/report", function ($topic, $message){
+            $this->onMessage($topic, $message);
+        });
+        $this->client->loop(true);
+    }
+}

+ 72 - 0
application/admin/services/MqttMessageClient.php

@@ -0,0 +1,72 @@
+<?php
+
+namespace app\admin\services;
+
+use PhpMqtt\Client\MqttClient;
+use PhpMqtt\Client\ConnectionSettings;
+use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException;
+use think\Env;
+
+class MqttMessageClient
+{
+    protected static $instance = null;
+
+    protected $client = null;
+
+    /**
+     * 功能:实例化mqtt
+     * @throws ProtocolNotSupportedException
+     * @throws \PhpMqtt\Client\Exceptions\ConfigurationInvalidException
+     * @throws \PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException
+     */
+    protected function __construct()
+    {
+//        $protocol = Env::get("mqtt.protocol");
+        $clientId = Env::get("mqtt.client_id");
+        $hostname = Env::get("mqtt.hostname");
+        $port = Env::get("mqtt.port");
+        $username = Env::get("mqtt.username");
+        $password = Env::get("mqtt.password");
+
+
+        $mqtt = new \PhpMqtt\Client\MqttClient($hostname, $port, $clientId, MqttClient::MQTT_3_1_1);
+        $connectionSettings = (new ConnectionSettings());
+        if ($username !== null) {
+            $connectionSettings = $connectionSettings->setUsername($username);
+        }
+        if ($password !== null) {
+            $connectionSettings = $connectionSettings->setPassword($password);
+        }
+
+        $this->client = $mqtt;
+        $mqtt->connect($connectionSettings);
+    }
+
+    protected function __clone()
+    {
+
+    }
+
+    public static function getInstance()
+    {
+        if (static::$instance == null) {
+            static::$instance = new static();
+        }
+        return static::$instance;
+    }
+
+    public function subscribe($topic, $callback)
+    {
+        $this->client->subscribe($topic, $callback);
+    }
+
+    public function publish(string $topic, string $message, int $qos = 0, bool $retain = false)
+    {
+        $this->client->publish($topic, $message, $qos, $retain);
+    }
+
+    public function loop(bool $keepAlive = true): void
+    {
+        $this->client->loop($keepAlive);
+    }
+}

+ 1 - 0
application/command.php

@@ -17,4 +17,5 @@ return [
     'app\admin\command\Min',
     'app\admin\command\Addon',
     'app\admin\command\Api',
+    'app\admin\command\MqttMessage'
 ];

+ 1 - 1
application/extra/queue.php

@@ -1,6 +1,6 @@
 <?php
 return [
-    'connector'  => 'Redis',          // Redis 驱动
+    'connector'  => 'Sync',          // Redis 驱动
     'expire'     => 0,             // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
     'default'    => 'default',    // 默认的队列名称
     'host'       => '127.0.0.1',       // redis 主机ip

+ 58 - 0
application/jobs/DeviceReportEvent.php

@@ -0,0 +1,58 @@
+<?php
+
+namespace app\jobs;
+
+use app\admin\model\Device;
+use think\Exception;
+use think\queue\Job;
+
+class DeviceReportEvent
+{
+
+    public function fire(Job $job, $data)
+    {
+        // 拆解是哪个设备发送的消息
+        $topic = $data['topic'];
+
+        $topicArray = explode('/', $topic);
+        $device_id = $topicArray[2];
+        $reportData = json_decode($data['message'], true);
+        try {
+            $this->eventRouter($device_id, $reportData);
+        } catch (Exception $exception) {
+            // 发送错误,跳过, 这里可以记录一下日志
+        }
+        $job->delete();
+    }
+
+    protected function eventRouter($device_id, $data)
+    {
+        // 消息体结构应该如下:
+//        [
+//            "TYPE"  => "ONLINE", // 事件类型
+//            "DATA"  =>  [] // 事件参数
+//        ];
+
+        var_dump($data);
+
+        if (!isset($data['TYPE'])) return false;
+
+        switch ($data['TYPE']) {
+            case "ONLINE":
+                $this->online($device_id, $data['DATA']);
+                break;
+        }
+        return true;
+    }
+
+    protected function online($device_id, $data)
+    {
+        $device = Device::where("device_sn", $device_id)->get();
+        if (!$device) {
+            return false;
+        }
+
+        $device->status = 1;
+        $device->save();
+    }
+}

+ 3 - 3
composer.json

@@ -19,7 +19,7 @@
         "topthink/framework": "dev-master",
         "topthink/think-captcha": "^1.0",
         "topthink/think-installer": "^1.0.14",
-        "topthink/think-queue": "1.1.6",
+        "topthink/think-queue": "^3.0",
         "topthink/think-helper": "^1.0.7",
         "karsonzhang/fastadmin-addons": "~1.3.2",
         "overtrue/pinyin": "^3.0",
@@ -31,9 +31,9 @@
         "ext-pdo": "*",
         "ext-bcmath": "*",
         "txthinking/mailer": "^2.0",
-        "php-mqtt/client": "^0.3.0",
         "topthink/think-worker": "^1.0",
-        "workerman/workerman-for-win": "^3.5"
+        "workerman/workerman-for-win": "^3.5",
+        "php-mqtt/client": "^1.8"
     },
     "config": {
         "preferred-install": "dist",