Springboot整合RocketMq
环境配置和说明
这部分我们看下SpringBoot如何快速集成RocketMQ。
在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.2.2后,基本就用不了了。
maven的配置
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
yaml配置信息
# setting rocketmq info
rocketmq:
name-server: 192.168.230.133:9876
producer:
group: springBootGroup
配合Controller的生产者
@RestController
@RequestMapping("/MQTest")
public class RocketMqController{
private final String topic = "TestTopic";
@Resource
private RocketMQTemplate rocketMqTemplate;
/**
* 发送普通消息
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String sendMessage(String message){
this.rocketMqTemplate.convertAndSend(topic,message);
return "消息发送完成";
}
/**
* 发送事务消息
* @param message
* @return
* @throws InterruptedException
*/
@GetMapping("/sendTransactionMessage")
public String sendTransactionMessage(String message) throws InterruptedException {
this.sendMessageInTransactionimpl(topic,message);
return "消息发送完成";
}
public void sendMessageInTransactionimpl(String topic,String msg) throws InterruptedException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//尝试在Header中加入一些自定义的属性。
Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)
//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])
//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
.setHeader("MyProp","MyProp_"+i)
.build();
String destination =topic+":"+tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
SendResult sendResult = rocketMqTemplate.sendMessageInTransaction(destination, message,destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}}
消费者监听器
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
@RocketMQMessageListener
里面的属性随着版本的更新而变化,里面的key会经常发送改变。同时它是消费者的核心,所有的功能可以在此配置注解,例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制
/**
* 控制消费模式,可选择并发或有序接收消息。
* 消息有序消费还是并发消费则由consumeMode属性定制。
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 控制消息模式,如果希望所有订阅者都接收消息,广播是一个不错的选择。
* 消费者是集群部署还是广播部署由messageModel属性定制。
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
事务消息监听器
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
注意事项
1.message包的改变
在使用rabbitmq的原生api的时候 我们创建的message 导入的包是rocketmq包自己的message
import org.apache.rocketmq.common.message.Message;
但是集合了springboot之后的我们可以发现 massage 已经变成了springboot的自己的message了
this.rocketMqTemplate.convertAndSend(topic,message);
ctrl+click
convertAndSend and ctrl+click
convertAndSend
Message<?> message = doConvert(payload, headers, postProcessor);
package org.springframework.messaging;
2.tag的"消失"
在集成的springboot的rocketmq中 tag 被放到了Header
里面,同时把tag和topic放到了一起,以:
的形式拼接
String destination =topic+":"+tags[i % tags.length];
SpringCloudStrem整合RocketMQ
版本迭代慢,文档少,可自行查看
评论区