MqttMessage.php 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. <?php
  2. namespace app\admin\command;
  3. use app\admin\services\MqttMessageClient;
  4. use app\jobs\DeviceReportEvent;
  5. use \PhpMqtt\Client\MqttClient;
  6. use \PhpMqtt\Client\ConnectionSettings;
  7. use think\console\Command;
  8. use think\console\Input;
  9. use think\console\input\Option;
  10. use think\console\Output;
  11. use think\Env;
  12. use think\Queue;
  13. class MqttMessage extends Command
  14. {
  15. protected $client = null;
  16. public function __construct($name=null)
  17. {
  18. parent::__construct($name);
  19. $this->client = MqttMessageClient::getInstance();
  20. }
  21. public function onMessage($topic, $message) {
  22. var_dump($topic);
  23. $jobHandlerClassName = DeviceReportEvent::class;
  24. $jobQueueName = "DeviceEventQueue";
  25. $jobData = [
  26. "topic" => $topic,
  27. "message" => $message
  28. ];
  29. $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
  30. if( $isPushed !== false ){
  31. echo '消息已发出';
  32. }else{
  33. echo '消息发送出错';
  34. }
  35. }
  36. public function configure()
  37. {
  38. $this
  39. ->setName('MqttMessage')
  40. ->setDescription('读取MQTT消息');
  41. }
  42. public function execute(Input $input, Output $output)
  43. {
  44. $this->client->subscribe("/device/+/report", function ($topic, $message){
  45. $this->onMessage($topic, $message);
  46. });
  47. $this->client->loop(true);
  48. }
  49. }