using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace SampleStack.RabbitMQ.Producer
{
class Program
{
static void Main(
string[] args)
{
for (
var i =
0; i <
1000; i++
)
{
using (
var connection =
new ConnectionFactory() { HostName =
"localhost" }.CreateConnection())
using (
var channel =
connection.CreateModel())
{
channel.QueueDeclare(queue: "work_queue", durable:
true, exclusive:
false, autoDelete:
false, arguments:
null);
// durable: true -> 队列持久性
var properties =
channel.CreateBasicProperties();
properties.Persistent =
true;
// Persistent = true -> 消息持久性
var message =
i.ToString();
Console.WriteLine(message);
channel.BasicPublish(exchange: "", routingKey:
"work_queue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
}
Thread.Sleep(1000);
}
}
}
}
// Install-Package RabbitMQ.Client
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace SampleStack.RabbitMQ.Consumer
{
class Program
{
static void Main(
string[] args)
{
using (
var connection =
new ConnectionFactory() { HostName =
"localhost" }.CreateConnection())
using (
var channel =
connection.CreateModel())
{
channel.QueueDeclare(queue: "work_queue", durable:
true, exclusive:
false, autoDelete:
false, arguments:
null);
// durable: true -> 队列持久性
channel.BasicQos(prefetchSize: 0, prefetchCount:
1,
global:
false);
// 告知RabbitMQ,在未收到当前Worker消息确认信号前,不再分发给消息,确保公平调度
var consumer =
new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var message =
Encoding.UTF8.GetString(e.Body);
Console.WriteLine(message);
Thread.Sleep(1000);
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
// 手动发送消息确认信号
};
channel.BasicConsume(queue: "work_queue", autoAck:
false, consumer: consumer);
// autoAck: false
Console.ReadKey();
}
}
}
}
// Install-Package RabbitMQ.Client
转载于:https://www.cnblogs.com/xiaowangzhi/p/11450799.html
相关资源:JAVA上百实例源码以及开源项目