本文摘抄自:https://www.cnblogs.com/alin-qu/p/8312874.html
php 操作RabbitMQ
基本流程图
如果exchange 没有绑定queue,则消息将会被丢弃如果创建exchange,queue,并且已经绑定了,则可以直接使用为了防止脚本出问题 可以配合supervisor
安装
从网站 https://packagist.org 搜索rabbitmq插件使用composer安装插件composer require php-amqplib/php-amqplib
使用
1.连接RabbitMQ服务器2.开始一个新的 channel3.新建一个exchange4.新建一个queue5.绑定queue和exchange6.发布一个消息7.建立一个消费者并注册一个回调函数8.监听数据
新建连接和channel
<?
php
require "./vendor/autoload.php"
;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "192.168.110.134"
;
$port = 5672
;
$user = "test"
;
$pass = "test"
;
$vhost = "/"
;
try{
$connection =
new AMQPStreamConnection(
$host,
$port,
$user,
$pass,
$vhost);
}catch (
Exception $e){
echo 'Caught exception: ',
$e->getMessage(), "\n";
die;
}
$channel =
$connection->channel();
新建一个exchange
/*
name: $exchange
type: fanout
passive: false // don't check is an exchange with the same name exists
durable: false // the exchange won't survive server restarts
auto_delete: true //the exchange will be deleted once the channel is closed.
*/
try{
$name = 'example_direct_exchange'
;
$type = "direct"
;
$passive =
false;
$durable =
true;
$auto_delete =
true;
$channel->exchange_declare(
$name,
$type,
$passive,
$durable,
$auto_delete);
}catch (
Exception $e){
echo 'Caught exception: ',
$e->getMessage(), "\n";
die;
}
参数 name
exchange名称
参数 type
exchange类型
fanout 是广播类型的消息 会给所有绑定的queue发送数据
参数 passive
true
1.
如果exchange已存在 则直接连接 并且不检查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;
2.
如果exchange不存在 则直接报错
false
1.
如果exchange不存在 则创建新的exchange
2.如果exchange已存在 则判断配置是否相同。如果配置不相同 则直接报错。比如已存在的exchange是fanout,新需要建立的是direct,会报错。
参数 auto_delete
true
当最后一个消费者取消订阅之后 exchange会被自动删除 一般用于临时exchange
新建一个queue
/*
name: $queue // should be unique in fanout exchange.
passive: false // don't check if a queue with the same name exists
durable: false // the queue will not survive server restarts
exclusive: false // the queue might be accessed by other channels
auto_delete: true //the queue will be deleted once the channel is closed.
*/
$queue1 = 'example_direct_queue_1'
;
$channel->queue_declare(
$queue1,
false,
true,
false,
false);
将queue和exchange绑定起来
$queue1 = 'example_direct_queue_1'
;
$exchange_name = 'example_direct_exchange'
;
$channel->queue_bind(
$queue1,
$exchange_name);
发布一个消息
$exchange_name = 'example_direct_exchange'
;
$messageBody =
array(
'example_direct_value'=>
date('Y-m-d H:i:s'),
);
$message =
new AMQPMessage(json_encode(
$messageBody));
$channel->basic_publish(
$message,
$exchange_name);
建立一个消费者并注册一个回调函数
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$consumerTag = 'consumer'
;
$queue = 'example_direct_queue_1'
;
$channel->basic_consume(
$queue, "",
false,
false,
false,
false,
function(
$msg){
$message = json_decode(
$msg->body,
true);
file_put_contents("./mq.log",
$message,
FILE_APPEND);
$msg->delivery_info['channel']->basic_ack(
$msg->delivery_info['delivery_tag'
]);
});
参数no_ack
true
消息只有在返回一个ack之后,才会被删除
false
消息被取出之后 会被立即删除
监听数据
try {
while (
count(
$channel->
callbacks)) {
$channel->
wait();
}
}
catch(\PhpAmqpLib\
Exception\AMQPTimeoutException
$e){
$channel->
close();
$channel->
close();
}
转载于:https://www.cnblogs.com/myworld2018/p/10655720.html
相关资源:JAVA上百实例源码以及开源项目