文章内容
2017/7/20 9:05:46,作 者: 黄兵
RabbitMQ Ⅱ
生产者示例代码:
// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);
消费者示例代码:
//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");
5. RPC
RPC——Remote Procedure Call,远程过程调用。
那RabbitMQ如何进行远程调用呢?示意图如下:
第一步,主要是进行远程调用的客户端需要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。
远程调用客户端:
//申明唯一guid用来标识此次发送的远程调用请求
var correlationId = Guid.NewGuid().ToString();
//申明需要监听的回调队列
var replyQueue = channel.QueueDeclare().QueueName;
var properties = channel.CreateBasicProperties();
properties.ReplyTo = replyQueue;//指定回调队列
properties.CorrelationId = correlationId;//指定消息唯一标识
string number = args.Length > 0 ? args[0] : "30";
var body = Encoding.UTF8.GetBytes(number);
//发布消息
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
Console.WriteLine($"[*] Request fib({number})");
// //创建消费者用于处理消息回调(远程调用返回结果)
var callbackConsumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
callbackConsumer.Received += (model, ea) =>
{
//仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
if (ea.BasicProperties.CorrelationId == correlationId)
{
var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
Console.WriteLine($"[x]: {responseMsg}");
}
};
远程调用服务端:
//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
int n = int.Parse(message);
Console.WriteLine($"Receive request of Fib({n})");
int result = Fib(n);
//从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
var properties = ea.BasicProperties;
var replyProerties = channel.CreateBasicProperties();
replyProerties.CorrelationId = properties.CorrelationId;
//将远程调用结果发送到客户端监听的队列上
channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
//手动发回消息确认
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
6. 总结
基于上面的demo和对几种不同exchange路由机制的学习,我们发现RabbitMQ主要是涉及到以下几个核心概念:
- Publisher:生产者,消息的发送方。
- Connection:网络连接。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。
- Exchange:交换器(路由器),负责消息的路由到相应队列。
- Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
- Queue:队列,消息的缓冲存储区。
- Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
- Broker:消息队列的服务器实体。
- Consumer:消费者,消息的接收方。
这次作为入门就讲到这里,下次我们来讲解下EventBus + RabbitMQ如何实现事件的分发。
本文转载自:『圣杰』 - RabbitMQ知多少
评论列表