消息发送方(生产者)如何知道消息是否真正地到达了 RabbitMQ(服务器) 生产者将信道设置成 confmn 确认)模式,一旦信道进入 confmn 模式,所有在该信道上面发布的消息都会被指派个唯一的 ID(1开启),一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认 CBasic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。
实现步骤
生产者将信道设置成confirm (确认)模式所有在开启确认模式信道上面发布的消息都会被指派一个唯一的ID(从l开始递增)一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认CBasic.Ack) 给生产者(包含消息的唯一ID),如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后才发出。将信道置为 publisher confirm 模式
channel.confirmSelect();判断是否发送成功
if (!channel.waitForConfirms()) { System.out.println( "send message failed" ) ; }else { //do something }代码
public class Send { //队列名 private final static String QUEUE_NAME = "queue2"; //路由器名 private final static String EXCHANGE_NAME = "exchange"; //绑定键 private final static String BINDING_KEY = "exchange"; //路由键 private final static String ROUTING_KEY = "exchange"; private static Connection connection =null; private static Channel channel = null; public static void main(String[] args) { try{ // 获取到连接以及mq通道 connection = ConnectionUtil.getConnection(); // 从连接中创建通道 channel = connection.createChannel(); //声明了一个direct 类型的交换器 channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); // 声明优先级队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //将路由与队列绑定,再为绑定的路径赋值一个绑定键 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,BINDING_KEY); // 将信道置为 publisher confirm 模式 channel.confirmSelect(); //发送数据 for (int i=0;i<100;i++){ String message = "Hello World!"+i; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); if (!channel.waitForConfirms()) { System.out.println( "send message failed" ) ; } } }catch (Exception e){ e.printStackTrace(); }finally { //关闭通道和连接 try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }