AddAgreementPdf.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. <?php
  2. namespace app\common\command;
  3. use app\common\model\master_worker\MasterWorkerAgree;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use think\console\Command;
  8. use think\console\Input;
  9. use think\console\Output;
  10. use think\facade\Config;
  11. use think\facade\Log;
  12. /**
  13. * mq生成PDF协议队列
  14. * Class AddAgreementPdf
  15. * @package app\command
  16. */
  17. class AddAgreementPdf extends Command
  18. {
  19. protected function configure()
  20. {
  21. $this->setName('query_add_agreement')
  22. ->setDescription('mq生成PDF协议队列');
  23. }
  24. protected function execute(Input $input, Output $output)
  25. {
  26. $mq_config = Config::get('mq');
  27. // 连接到RabbitMQ服务
  28. $connection = new AMQPStreamConnection($mq_config['host'], $mq_config['port'], $mq_config['username'], $mq_config['password'], $mq_config['vhost']);
  29. $channel = $connection->channel();
  30. $channel->exchange_declare('add_agreement','topic',true,true,true,false,false);
  31. // 声明队列
  32. $queue = 'add_agreement';
  33. $channel->queue_declare($queue, true, true, false, true,false);
  34. //接收消息
  35. $callback = function($msg) {
  36. $param = json_decode($msg->body, true);
  37. self::addPdf($param);
  38. };
  39. $channel->basic_consume('add_agreement', '', false, true, false, false, $callback);
  40. while(count($channel->callbacks)) {
  41. $channel->wait();
  42. }
  43. $channel->close();
  44. $connection->close();
  45. }
  46. public static function notifyAgreement()
  47. {
  48. $mq_config = Config::get('mq');
  49. // 连接到RabbitMQ服务
  50. $connection = new AMQPStreamConnection($mq_config['host'], $mq_config['port'], $mq_config['username'], $mq_config['password'], $mq_config['vhost']);
  51. $channel = $connection->channel();
  52. $channel->exchange_declare('add_agreement','topic',true,true,true,false,false);
  53. // 声明队列
  54. $queue = 'add_agreement';
  55. $channel->queue_declare($queue, true, true, false, true,false);
  56. //接收消息
  57. $callback = function($msg) {
  58. $param = json_decode($msg->body, true);
  59. self::addPdf($param);
  60. };
  61. $channel->basic_consume('add_agreement', '', false, true, false, false, $callback);
  62. while(count($channel->callbacks)) {
  63. $channel->wait();
  64. }
  65. $channel->close();
  66. $connection->close();
  67. }
  68. public static function addPdf($param)
  69. {
  70. $code = $param['code'];
  71. $url = $param['url'];
  72. $pdf = '/'.$code.'.pdf';
  73. $path = 'uploads/agreement_pdf/'.date('Ymd');
  74. if(!file_exists($path)){
  75. mkdir ($path,0777,true);
  76. }
  77. $shell_ = 'wkhtmltopdf --page-height 297mm '.$url.' '.$path.$pdf;
  78. $output = [];
  79. $return_var = 0;
  80. \exec($shell_ . ' > /dev/null 2>&1', $output, $return_var);
  81. if ($return_var === 0) {
  82. $agreement = MasterWorkerAgree::where('code', $code)->findOrEmpty();
  83. $agreement->pdf_url = $path.$pdf;
  84. $agreement->save();
  85. } else {
  86. // 处理错误
  87. echo "Command failed with return code: $return_var";
  88. }
  89. }
  90. public static function sendMq($code,$url):bool
  91. {
  92. $mq_config = Config::get('mq');
  93. // 连接到RabbitMQ服务
  94. $connection = new AMQPStreamConnection($mq_config['host'], $mq_config['port'], $mq_config['username'], $mq_config['password'], $mq_config['vhost']);
  95. $channel = $connection->channel();
  96. $channel->exchange_declare('add_agreement','topic',true,true,true,false,false);
  97. // 声明队列
  98. $queue = 'add_agreement';
  99. $channel->queue_declare($queue, true, true, false, true,false);
  100. // 发送消息
  101. $data = [
  102. 'code'=>$code,
  103. 'url'=>$url
  104. ];
  105. $messageBody = json_encode($data,JSON_UNESCAPED_UNICODE);
  106. $message = new AMQPMessage($messageBody);
  107. $channel->basic_publish($message, '', $queue);
  108. $channel->close();
  109. $connection->close();
  110. return true;
  111. }
  112. }