之前我们都是单向发送消息,客户端发送消息给服务端,那么问题来了,我现在发一个命令给远程客户端,让它去执行,执行之后的结果,我想把这个结果再返回。这个模型叫什么呐,这种模型叫RPC=>remote procedure call。
怎么返回这个消息呢?
答:就server 端和客户端既是消费者,又是生产者。
想要深入了解RabbitMQ:猛击这里
二、模型图 三、逻辑代码
注:
我是想不阻塞,而是想每过一段时间,就过来检查一下,就不能用start_consumer,而是用connection.process_data_evevts(),它是不阻塞的,如果收到消息就收到,收不到消息也返回,就继续往下执行。reply_to就是想让服务器执行完命令之后,把结果返回到这个queue里面。在while self.respose is None中的代码我可以不做time.sleep,我这边可以发消息给服务器端,这个消息不一定按顺序发给服务器端,如果不做self.corr_id == props.correlation_id的验证,那数据就可能对不上了。import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="rpc_queue")# 声明RPC请求队列 def fib(n): # 数据处理方法 "斐波那契数列" if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2) # 对RPC请求队列中的请求进行处理 def on_request(ch,method,props,body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) # 调用数据处理方法 # 将处理结果(响应)发送到回调队列 ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=\ props.correlation_id),#props的是客户端的发过来的信息,这边把correlation_id返回给客户端做验证 body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # 负载均衡,同一时刻发送给该服务器的请求不超过一个 channel.basic_qos(prefetch_count=1) # 服务器订阅RPC请求队列,当队列中有请求时,将调用`on_request`方法处理请求 channel.basic_consume(on_request,queue="rpc_queue") print(" [x] Awaiting RPC requests") channel.start_consuming()
注:props.reply_to,这个就是客户端返回过来的queue。
问:如果客户端和服务用的是同一个queue,会有什么影响?
答:如果客户端也发到rpc_queue中,那么客户端就会收到自己的消息,就会形成一个死循坏,把自己给玩死了。
转载于:https://www.cnblogs.com/xiangjun555/articles/7921022.html
相关资源:JAVA上百实例源码以及开源项目