client = MqttMessageClient::getInstance(); } public function onMessage($topic, $message) { var_dump($topic); $jobHandlerClassName = DeviceReportEvent::class; $jobQueueName = "DeviceEventQueue"; $jobData = [ "topic" => $topic, "message" => $message ]; $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); if( $isPushed !== false ){ echo '消息已发出'; }else{ echo '消息发送出错'; } } public function configure() { $this ->setName('MqttMessage') ->setDescription('读取MQTT消息'); } public function execute(Input $input, Output $output) { $this->client->subscribe("/device/+/report", function ($topic, $message){ $this->onMessage($topic, $message); }); $this->client->loop(true); } }