MqttMessage.php 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. <?php
  2. namespace app\admin\command;
  3. use app\admin\services\MqttMessageClient;
  4. use app\jobs\DeviceReportEvent;
  5. use think\console\Command;
  6. use think\console\Input;
  7. use think\console\Output;
  8. use think\Queue;
  9. class MqttMessage extends Command
  10. {
  11. protected $client = null;
  12. public function __construct($name=null)
  13. {
  14. parent::__construct($name);
  15. $this->client = MqttMessageClient::getInstance();
  16. }
  17. public function onMessage($topic, $message) {
  18. var_dump($topic);
  19. $jobHandlerClassName = DeviceReportEvent::class;
  20. $jobQueueName = "DeviceEventQueue";
  21. $jobData = [
  22. "topic" => $topic,
  23. "message" => $message
  24. ];
  25. $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
  26. if( $isPushed !== false ){
  27. echo '消息已发出';
  28. }else{
  29. echo '消息发送出错';
  30. }
  31. }
  32. public function configure()
  33. {
  34. $this
  35. ->setName('MqttMessage')
  36. ->setDescription('读取MQTT消息');
  37. }
  38. public function execute(Input $input, Output $output)
  39. {
  40. $this->client->subscribe("/device/+/report", function ($topic, $message){
  41. $this->onMessage($topic, $message);
  42. });
  43. $this->client->loop(true);
  44. }
  45. }