# rocketmq-example
**Repository Path**: hhjiesen/rocketmq-example
## Basic Information
- **Project Name**: rocketmq-example
- **Description**: RocketMQ部署及spring boot集成示例
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 3
- **Forks**: 3
- **Created**: 2020-12-14
- **Last Updated**: 2025-07-15
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Apache RocketMQ 部署/开发指南
* [官方开发者指南](https://github.com/apache/rocketmq/tree/master/docs/cn)
* [官方docker部署指南](https://github.com/apache/rocketmq-docker)
# 本示例部署笔记
[RokectMQ for docker部署](./RokectmqForDocker.md)
# spring boot 集成 RocketMQ
## 1. 添加maven依赖
```xml
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq
2.2.3.RELEASE
```
## 2. 添加配置
```properties
# rocketmq nameserver地址
rocketmq.name-server: 192.168.1.71:9876;192.168.1.76:9876;192.168.1.77:9876
# 生产组名称
rocketmq.producer.group: ${spring.application.name}
```
## 3. 生产者-RocketMQTemplate示例
[RocketMQTemplateController](src/main/java/com/ofwiki/dm/rocketmq/controller/RocketMQTemplateController.java)
```java
@RestController
@RequestMapping("rocketmq_test")
public class RocketMQTemplateController {
private final Logger logger = LoggerFactory.getLogger(RocketMQTemplateController.class);
@Autowired
private RocketMQTemplate rocketMQTemplate;
//
// 消息生产者,消息消费者,生产者组,消费者组,顺序消息,主题,标签,Broker Server,Name Server等基本概念
// 参见:https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md
//
//
//
// @ApiOperation("1.同步发送消息,可以立马获取到结果")
@GetMapping("sync_send")
public Result syncSend(String msg,String topic,String tags) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
logger.info("同步发送消息完成,发送结果:{}", sendResult);
return Result.success(sendResult);
}
// @ApiOperation("2.异步发送消息,发送结果异步返回")
@GetMapping("async_send")
public Result asyncSend(String msg,String topic,String tags) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("异步发送消息成功,result:" + sendResult);
}
@Override
public void onException(Throwable e) {
// 做一些数据补偿或其它处理
logger.error("异步发送消息失败,msg:"+e.getMessage(), e);
}
});
return Result.success();
}
// @ApiOperation("3.异步发送消息,没有返回结果,不能确保消息是否发送成功,性能最好")
@GetMapping("send_one_way")
public Result sendOneway(String msg,String topic,String tags) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
logger.info("异步发送消息,destination:{},message:{}", destination, message);
rocketMQTemplate.sendOneWay(destination, message);
return Result.success();
}
// @ApiOperation("4.发送顺序消息")
@GetMapping("send_send_orderly")
public Result syncSendOrderly(String msg,String topic,String tags,String hashKey) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
// hashKey: 根据此hash键分配消息到队列,orderId, productId ...
// 1.mysql binlog同步依赖严格顺序执行sql
// 2.订单产生了3条消息,分别是订单创建、付款、完成。给用户发送订单状态提醒就得严格按照这个顺序进行消费。避免状态颠倒混乱
// 根据hashKey将同一组消息分配到相同的队列,然后消费端顺序消费队列就能保存消息的消费顺序
// 消费端消费模式也必需是顺序模式 ConsumeMode.ORDERLY
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
logger.info("发送顺序消息,发送结果:{}", sendResult);
return Result.success(sendResult);
}
// @ApiOperation("5.发送延时消息")
@GetMapping("send_delay_time")
public Result syncSendDelayTime(String msg,String topic,String tags) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
// 场景:比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
// 延时消息的使用限制:
// 1. 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关
// 2. 默认18个等级对应的时长:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SendResult sendResult = rocketMQTemplate.syncSend(
destination, message,
// 发送超时毫秒数
2000,
//设置延时等级3,这个消息将在10s之后发送
3
);
logger.info("发送延时消息,发送结果:{}", sendResult);
return Result.success(sendResult);
}
// @ApiOperation("6.发送事务消息")
@GetMapping("send_batch")
public Result syncBatch(String msg,String topic,String tags) {
msg += DateUtils.format(LocalDateTime.now());
Message message = MessageBuilder
// 消息内容,泛型
.withPayload(msg)
// key:可通过key查询消息轨迹,如消息被谁消费,定位消息丢失问题。由于是哈希索引,须保证key尽可能唯一
.setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
.setHeader("transId",UUID.randomUUID().toString())
.build();
// topic :主题,消息必须发送到topic
// tags : 标签,可以根据不同业务目的在同一主题下设置不同标签,消费者可以根据Tag实现对不同的不同消费逻辑
// tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个
// destination: 的格式为topicName:tagName
String destination = topic + ":" + tags;
// 同步得到本地事务执行结果
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx_producer_group",destination, message, msg);
logger.info("2.发送事务消息,发送结果:{}", sendResult);
return Result.success(sendResult);
}
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public static class SyncProducerListener implements RocketMQLocalTransactionListener {
private final Logger logger = LoggerFactory.getLogger(SyncProducerListener.class);
private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object data) {
String key = String.valueOf(message.getHeaders().get("transId"));
localTrans.put(key, RocketMQLocalTransactionState.UNKNOWN);
try {
// 执行本地事务,如:userService.save(data),这个方法是同步执行的
// 模拟本地事务耗时5s
Thread.sleep(5000);
if (new Random().nextInt() % 2 == 0) {
throw new Exception("本地事务异常");
}
logger.info("1.【本地业务执行完毕】 msg:{}, Object:{}", message, data);
localTrans.put(key, RocketMQLocalTransactionState.COMMIT);
} catch (Exception e) {
e.printStackTrace();
if (new Random().nextInt() % 2 == 0) {
throw new RuntimeException("没有办法确认本地事务状态,message key:" + key);
}
logger.error("1.【执行本地业务异常】 exception message:{}", e.getMessage());
localTrans.put(key, RocketMQLocalTransactionState.ROLLBACK);
}
return localTrans.get(message.getHeaders().getId());
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String key = String.valueOf(message.getHeaders().get("transId"));
RocketMQLocalTransactionState state = localTrans.get(key);
logger.info("3.【执行检查任务】:message key{},trans stats:{},",key,state);
if (state != null) {
return state;
}
logger.info("4.【执行检查任务】状态为空,默认提交事务,message key:{}",key);
return RocketMQLocalTransactionState.COMMIT;
}
}
}
```
## 消费者-`@RocketMQMessageListener`示例
[@RocketMQMessageListener示例](src/main/java/com/ofwiki/dm/rocketmq/consumer/OrderSuccessListener.java)
```java
@Component
@RocketMQMessageListener(
// 同消费组的,主题和标签必须一致,否则新启动的实例会覆盖原有实例,可能会出消息丢失的情况
consumerGroup = "orderSuccessGroup",
// 消费主题
topic = "shop_order_topic",
// 消费标签,可以不指定或指定一到多个,多个用 || 连接
selectorExpression = "order.status.success || order.status.complete",
// 消费模式:
// 1.顺序消费:一个线程消费一个队列,从而保证队列的消费有顺序的
// 2.并行消费:多个线程消费相同的队列,没有顺序保证,但消费消费快 - 默认
consumeMode = ConsumeMode.ORDERLY
// 最大线程数
// ,consumeThreadMax = 64
// 消息模式:
// 1.集群消费: 相同Consumer Group的每个Consumer实例平均分摊消息。
// 2.广播消费: 相同Consumer Group的每个Consumer实例都接收全量的消息。
// ,messageModel = MessageModel.CLUSTERING
)
public class OrderSuccessListener implements RocketMQListener {
private Logger logger = LoggerFactory.getLogger(OrderSuccessListener.class);
@Override
public void onMessage(String message) {
logger.info("订单已success:" + message);
}
}
```