php 操作RabbitMQ

mac2022-06-30  70

本文摘抄自:https://www.cnblogs.com/alin-qu/p/8312874.html

php 操作RabbitMQ

 

基本流程图

如果exchange 没有绑定queue,则消息将会被丢弃如果创建exchange,queue,并且已经绑定了,则可以直接使用为了防止脚本出问题 可以配合supervisor

安装

从网站 https://packagist.org 搜索rabbitmq插件使用composer安装插件composer require php-amqplib/php-amqplib

使用

1.连接RabbitMQ服务器2.开始一个新的 channel3.新建一个exchange4.新建一个queue5.绑定queue和exchange6.发布一个消息7.建立一个消费者并注册一个回调函数8.监听数据

新建连接和channel

<?php require "./vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $host = "192.168.110.134"; $port = 5672; $user = "test"; $pass = "test"; $vhost = "/"; try{ $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost); }catch (Exception $e){ echo 'Caught exception: ', $e->getMessage(), "\n";die; } $channel = $connection->channel();

新建一个exchange

/* name: $exchange type: fanout passive: false // don't check is an exchange with the same name exists durable: false // the exchange won't survive server restarts auto_delete: true //the exchange will be deleted once the channel is closed. */ try{ $name = 'example_direct_exchange'; $type = "direct"; $passive = false; $durable = true; $auto_delete = true; $channel->exchange_declare($name, $type, $passive, $durable, $auto_delete); }catch (Exception $e){ echo 'Caught exception: ', $e->getMessage(), "\n";die; }

参数 name

exchange名称

参数 type

exchange类型 fanout 是广播类型的消息 会给所有绑定的queue发送数据

参数 passive

true 1.如果exchange已存在 则直接连接 并且不检查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不会报错; 2.如果exchange不存在 则直接报错 false 1.如果exchange不存在 则创建新的exchange 2.如果exchange已存在 则判断配置是否相同。如果配置不相同 则直接报错。比如已存在的exchange是fanout,新需要建立的是direct,会报错。

参数 auto_delete

true 当最后一个消费者取消订阅之后 exchange会被自动删除 一般用于临时exchange

新建一个queue

/* name: $queue // should be unique in fanout exchange. passive: false // don't check if a queue with the same name exists durable: false // the queue will not survive server restarts exclusive: false // the queue might be accessed by other channels auto_delete: true //the queue will be deleted once the channel is closed. */ $queue1 = 'example_direct_queue_1'; $channel->queue_declare($queue1, false, true, false, false);

将queue和exchange绑定起来

$queue1 = 'example_direct_queue_1'; $exchange_name = 'example_direct_exchange'; $channel->queue_bind($queue1, $exchange_name);

发布一个消息

$exchange_name = 'example_direct_exchange'; $messageBody = array( 'example_direct_value'=>date('Y-m-d H:i:s'), ); $message = new AMQPMessage(json_encode($messageBody)); $channel->basic_publish($message, $exchange_name);

建立一个消费者并注册一个回调函数

/* queue: Queue from where to get the messages consumer_tag: Consumer identifier no_local: Don't receive messages published by this consumer. no_ack: Tells the server if the consumer will acknowledge the messages. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue nowait: callback: A PHP Callback */ $consumerTag = 'consumer'; $queue = 'example_direct_queue_1'; $channel->basic_consume($queue, "", false, false, false, false,function($msg){ $message = json_decode($msg->body, true); file_put_contents("./mq.log", $message,FILE_APPEND); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); });

参数no_ack

true 消息只有在返回一个ack之后,才会被删除 false 消息被取出之后 会被立即删除

监听数据

try { while (count($channel->callbacks)) { $channel->wait(); } } catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){ $channel->close(); $channel->close(); }

 

转载于:https://www.cnblogs.com/myworld2018/p/10655720.html

相关资源:JAVA上百实例源码以及开源项目
最新回复(0)