基于RabbitMQ实现RPC
编辑前言
这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。 代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq
什么是RPC
RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。 我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。
RabbitMQ如何实现RPC
官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:
// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。
- contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
- replyTo 这个就是上面指定的回调队列。
- correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。
首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程: 实现RPC的具体工作流程:
- 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
- 这个消息是发送到指定的rpc_queue这个队列上面。
- 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
- 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。
RPC简单示例
我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下: 首先服务端:
/**
* RPC服务端
*
* @author yuanzhihao
* @since 2020/11/21
*/
public class RPCServer {
public static void main(String[] args) throws IOException, TimeoutException {
// 首先还是正常获得connection以及channel对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 定义一个rpc的队列
String queueName = "test_rpc";
channel.queueDeclare(queueName, false, false, false, null);
Object monitor = new Object();
// 具体的消费代码里面实现
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
// 提供一个大小写转换的方法
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("toUpperCase(" + message + ")");
response = toUpperCase(message);
} catch (RuntimeException e) {
System.out.println(e.toString());
} finally {
// 将响应传回replyTo队列
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
// 设置了手动应答 需要手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 执行完成会释放主线程的锁
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
// 监听"test_rpc"队列
channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));
// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 提供一个大小写转换的方法
private static String toUpperCase(String msg) {
return msg.toUpperCase();
}
}
客户端:
/**
* RPC客户端
*
* @author yuanzhihao
* @since 2020/11/21
*/
public class RPCClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建connection以及channel对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
try ( Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列
String queueName = "test_rpc";
// 请求消息中需要带一个唯一标识ID
String corrId = UUID.randomUUID().toString();
// 声明一个回调队列
String replayQueueName = channel.queueDeclare().getQueue();
// 将correlationId以及回调队列设置在消息的属性中
AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replayQueueName)
.build();
// 具体消息内容
String msg = "hello rpc";
// 发送请求消息
channel.basicPublish("",queueName,properties,msg.getBytes());
// 设置一个阻塞队列 等待服务端的响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {
// 注意 这边根据correlationId进行下判断
if (message.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(message.getBody(), StandardCharsets.UTF_8));
}
}, consumerTag -> {});
// 获取响应结果
String take = response.take();
System.out.println("rpc result is "+ take);
channel.basicCancel(ctag);
}
}
}
执行代码,具体的客户端与服务端运行结果
通过Spring AMQP实现RPC
通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。 首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~
/**
* 主配置类
*
* @author yuanzhihao
* @since 2021/1/9
*/
@Configuration
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
// 注入connectionFactory对象
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.1.108:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
// 声明队列
@Bean
public Queue rpcQueue() {
return new Queue("test_rpc",false);
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
// 创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
// 消息监听器
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
// 监听的队列
container.setQueues(rpcQueue());
MessageListener messageListener = message -> {
String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("Receive a message message is {}", receiveMsg);
// 执行对应逻辑
String responseMsg = toUpperCase(receiveMsg);
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().
setCorrelationId(message.getMessageProperties().getCorrelationId()).
build();
// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面
rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));
};
// 设置监听器
container.setMessageListener(messageListener);
return container;
}
// 提供一个大小写转换的方法
private String toUpperCase(String msg) {
return msg.toUpperCase();
}
}
客户端我采用test单元测试的形式
/**
* spring amqp rpc 测试类
*
* @author yuanzhihao
* @since 2021/1/9
*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
// 测试RPC客户端
@Test
public void testRpcClient() {
// 设置correlationId
String corrId = UUID.randomUUID().toString();
String msg = "hello rpc";
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();
// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列
// 格式默认 "amq.rabbitmq.reply-to" 前缀
Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));
log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
具体实现可以看下代码的注释,注意这边我的上一篇博客写的基于RabbitMQ实现的RPC框架就是主要使用的Spring AMQP提供的这些方法来实现的。贴一下地址,帮忙点下Star呀~:https://github.com/yzh19961031/rabbitmq_rpc 代码执行结果: