# requestmerge
**Repository Path**: ckwangPrivate/requestmerge
## Basic Information
- **Project Name**: requestmerge
- **Description**: 一个高性能的Spring Boot Starter,支持HTTP和Dubbo请求合并处理,通过AOP切面和自定义注解实现,集成了多种合并算法,有效减少数据库查询和远程调用次数,提升系统性能
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 8
- **Forks**: 0
- **Created**: 2025-11-08
- **Last Updated**: 2025-11-10
## Categories & Tags
**Categories**: spring-boot-ext
**Tags**: Java, SpringBoot
## README
# Request Merge Spring Boot Starter
一个高性能的Spring Boot Starter,支持HTTP和Dubbo请求合并处理,通过AOP切面和自定义注解实现,集成了多种合并算法,有效减少数据库查询和远程调用次数,提升系统性能。
无请求合并的示意图

请求合并的示意图

## 功能特性
- 支持HTTP和Dubbo请求合并处理
- 基于AOP切面实现,使用简单
- 多种合并算法:滑动窗口、漏斗、立即合并
- 支持动态分组和SpEL表达式
- 可配置的超时和异常处理
- 易于集成到现有Spring Boot项目
- 丰富的使用场景示例
- 完整的测试套件,使用JUnit 5确保功能稳定性
- 支持复杂参数分组和条件合并
## 项目结构
项目采用模块化结构设计,包含以下主要模块:
### 1. request-merge-parent
- 父级POM,定义项目的基本配置和依赖管理
- 管理所有子模块的版本和通用配置
- 集成最新的测试框架支持(JUnit 5)
### 2. request-merge-core
- 核心功能模块,包含所有算法实现和核心组件
- 包含注解、AOP切面、合并算法、配置类等基础功能
- 提供扩展接口,支持自定义批量处理器实现
### 3. request-merge-demo
- 演示模块,展示如何使用框架
- 包含HTTP和Dubbo服务示例
- 完整的测试套件,使用JUnit 5确保功能稳定性
## 快速开始
### 1. 添加依赖
在Maven项目中添加依赖:
```xml
com.wangbin.requestmerge
request-merge-spring-boot-starter
1.0.1
```
### 2. 配置属性
在`application.properties`或`application.yml`中配置相关属性:
```yaml
request:
merge:
enabled: true
default-window-size: 100
default-time-unit: MILLISECONDS
default-max-merge-count: 100
default-algorithm-type: SLIDING_WINDOW
default-timeout: 500
default-timeout-unit: MILLISECONDS
support-http: true
support-dubbo: true
enable-detail-log: false
```
### 3. 使用示例
#### HTTP请求合并 - 基本示例
```java
import com.wangbin.requestmerge.annotation.RequestMerge;
import com.wangbin.requestmerge.enums.MergeAlgorithmType;
@RestController
@RequestMapping("/api/users")
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/{id}")
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 50,
maxMergeCount = 50,
groupExpression = "#id",
timeout = 300
)
public User getUserById(@PathVariable Long id) {
return userService.getUserById(id);
}
}
```
#### HTTP请求合并 - 高级示例
```java
import com.wangbin.requestmerge.annotation.RequestMerge;
import com.wangbin.requestmerge.enums.MergeAlgorithmType;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/advanced/users")
public class AdvancedRequestMergeController {
@Autowired
private UserService userService;
/**
* 滑动窗口合并 - 适合高频调用场景
*/
@GetMapping("/sliding/{id}")
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 100,
maxMergeCount = 30,
groupExpression = "#id",
timeout = 500
)
public User getUserWithSlidingWindow(@PathVariable Long id) {
return userService.getUserById(id);
}
/**
* 漏斗算法合并 - 适合控制请求速率场景
*/
@GetMapping("/leaky/{id}")
@RequestMerge(
algorithmType = MergeAlgorithmType.LEAKY_BUCKET,
windowSize = 150,
maxMergeCount = 50,
groupKey = "leakyBucketGroup"
)
public User getUserWithLeakyBucket(@PathVariable Long id) {
return userService.getUserById(id);
}
/**
* 即时合并 - 适合高频低延迟场景
*/
@GetMapping("/immediate/{id}")
@RequestMerge(
algorithmType = MergeAlgorithmType.IMMEDIATE_MERGE,
maxMergeCount = 100,
groupExpression = "#id"
)
public User getUserWithImmediateMerge(@PathVariable Long id) {
return userService.getUserById(id);
}
/**
* 多参数分组 - 复杂业务场景示例
*/
@GetMapping("/multi-param/{id}")
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 80,
groupExpression = "#id + '-' + #region",
maxMergeCount = 40
)
public User getUserWithRegion(@PathVariable Long id, @RequestParam String region) {
return userService.getUserByIdAndRegion(id, region);
}
}
#### Dubbo服务请求合并
在Dubbo服务中,需要在接口或实现类上添加`@RequestMerge`注解,并实现批量处理方法:
**1. 定义Dubbo服务接口** - 多种合并场景
```java
import com.wangbin.requestmerge.annotation.RequestMerge;
import com.wangbin.requestmerge.enums.MergeAlgorithmType;
import java.util.*;
import java.util.concurrent.TimeUnit;
public interface DubboUserService {
/**
* 基本请求合并 - 滑动窗口算法
*/
@RequestMerge(
windowSize = 100,
timeUnit = TimeUnit.MILLISECONDS,
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
groupKey = "dubboUserService"
)
String getUserName(Long userId);
/**
* 批量获取用户名,用于请求合并的批量处理方法
*/
Map batchGetUserNames(List userIds);
/**
* 多参数请求合并 - 支持按参数类型分组
*/
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 80,
maxMergeCount = 40,
groupExpression = "#infoType",
timeout = 200
)
String getUserInfo(Long userId, String infoType);
/**
* 多参数批量处理方法
*/
Map> batchGetUserInfos(List userIds, List infoTypes);
/**
* 即时合并 - 适用于高频低延迟场景
*/
@RequestMerge(
algorithmType = MergeAlgorithmType.IMMEDIATE_MERGE,
maxMergeCount = 100
)
String getUserDetails(Long userId);
/**
* 即时合并对应的批量方法
*/
Map batchGetUserDetails(List userIds);
/**
* 漏斗算法合并 - 适用于需要控制处理速率的场景
*/
@RequestMerge(
algorithmType = MergeAlgorithmType.LEAKY_BUCKET,
windowSize = 150,
maxMergeCount = 50,
timeout = 1000
)
String getUserExtendedInfo(Long userId);
/**
* 漏斗算法对应的批量方法
*/
Map batchGetUserExtendedInfos(List userIds);
/**
* 复杂参数分组 - 多参数条件下的分组策略
*/
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 120,
maxMergeCount = 30,
groupExpression = "#region + '-' + #priority"
)
String getUserByRegionAndPriority(Long userId, String region, Integer priority);
/**
* 复杂参数分组对应的批量方法
*/
Map batchGetUsersByRegionAndPriority(List userIds, String region, Integer priority);
/**
* 高频访问场景 - 小窗口和高合并数量
*/
@RequestMerge(
algorithmType = MergeAlgorithmType.SLIDING_WINDOW,
windowSize = 50,
maxMergeCount = 100,
timeout = 150
)
String getHighFreqUserInfo(Long userId);
/**
* 高频访问对应的批量方法
*/
Map batchGetHighFreqUserInfos(List userIds);
}
```
**2. 实现Dubbo服务**
```java
import org.apache.dubbo.config.annotation.DubboService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@DubboService(version = "1.0.0")
public class DubboUserServiceImpl implements DubboUserService {
private static final Map USER_MAP = new HashMap<>();
static {
// 模拟用户数据
USER_MAP.put(1L, "用户1");
USER_MAP.put(2L, "用户2");
USER_MAP.put(3L, "用户3");
}
@Override
public String getUserName(Long userId) {
// 模拟网络延迟
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return USER_MAP.getOrDefault(userId, "未知用户");
}
@Override
public Map batchGetUserNames(List userIds) {
// 模拟网络延迟
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Map resultMap = new HashMap<>();
for (Long userId : userIds) {
resultMap.put(userId, USER_MAP.getOrDefault(userId, "未知用户"));
}
return resultMap;
}
}
```
**3. Dubbo客户端调用**
```java
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@Component
public class DubboUserServiceClient {
@DubboReference(version = "1.0.0")
private DubboUserService dubboUserService;
/**
* 调用单个用户查询
*/
public String callSingleUser(Long userId) {
return dubboUserService.getUserName(userId);
}
/**
* 测试并发调用,验证请求合并效果
*/
public Map testConcurrentCalls(List userIds) {
List> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> {
return dubboUserService.getUserName(id);
}))
.collect(Collectors.toList());
Map result = new HashMap<>();
for (int i = 0; i < userIds.size(); i++) {
try {
result.put(userIds.get(i), futures.get(i).get());
} catch (Exception e) {
result.put(userIds.get(i), "错误: " + e.getMessage());
}
}
return result;
}
}
```
### 4. 批量处理器实现
要实现真正的批量处理,需要提供批量处理函数。可以通过以下方式:
1. **全局批量处理器**:
```java
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class GlobalBatchProcessor {
@Autowired
private UserRepository userRepository;
public Map batchGetUsers(Map userIdMap) {
// 批量查询逻辑
List ids = new ArrayList<>(userIdMap.values());
List users = userRepository.findAllById(ids);
// 转换为map,使用原始请求ID作为key
Map result = new HashMap<>();
for (Map.Entry entry : userIdMap.entrySet()) {
for (User user : users) {
if (Objects.equals(user.getId(), entry.getValue())) {
result.put(entry.getKey(), user);
break;
}
}
}
return result;
}
}
```
2. **使用自定义实现**:
在具体业务中,需要重写或扩展`RequestMergeAspect`类,提供特定的批量处理逻辑。
## 配置说明
### @RequestMerge注解参数
| 参数名 | 类型 | 默认值 | 说明 |
|-------|------|-------|------|
| algorithmType | MergeAlgorithmType | SLIDING_WINDOW | 合并算法类型 |
| windowSize | int | 100 | 窗口大小(毫秒) |
| maxMergeCount | int | 100 | 最大合并数量 |
| groupKey | String | "default" | 分组键 |
| groupExpression | String | "" | 分组表达式(SpEL) |
| timeout | long | 500 | 超时时间 |
| timeoutUnit | TimeUnit | MILLISECONDS | 超时时间单位 |
| timeUnit | TimeUnit | MILLISECONDS | 窗口时间单位 |
| ignoreException | boolean | false | 是否忽略异常 |
### 合并算法类型
- **SLIDING_WINDOW**:滑动窗口算法,在指定时间窗口内合并请求
- **LEAKY_BUCKET**:漏斗算法,按固定速率处理合并的请求
- **IMMEDIATE_MERGE**:立即合并算法,不等待时间窗口
## 最佳实践
1. **合理设置窗口大小**:窗口太小无法有效合并,太大会增加延迟,通常建议50-200毫秒
- 高频访问场景:50-100ms
- 普通业务场景:100-150ms
- 复杂处理场景:150-200ms
2. **选择合适的合并算法**:
- 滑动窗口(SLIDING_WINDOW):通用场景,平衡延迟和合并效果
- 漏斗算法(LEAKY_BUCKET):适合需要控制请求速率的场景
- 即时合并(IMMEDIATE_MERGE):适合高频低延迟场景
3. **使用合适的分组策略**:根据业务特性设置有效的分组键
- 按用户ID分组:`groupExpression = "#userId"`
- 按业务类型分组:`groupExpression = "#businessType"`
- 组合条件分组:`groupExpression = "#region + '-' + #priority"`
4. **实现高效的批量处理逻辑**:批量处理是性能提升的关键
- 使用IN查询替代多次单条查询
- 使用批量API减少网络交互
- 优化批量处理的内存使用
5. **合理设置超时时间**:避免过长的等待时间
- 普通请求:300-500ms
- 高频请求:150-300ms
- 复杂处理:500-1000ms
6. **监控合并效果**:添加日志和监控,评估合并效果
- 记录请求合并率
- 统计处理时间变化
- 监控系统负载变化
7. **进行充分测试**:在生产环境前进行压力测试
- 模拟高并发场景
- 测试不同参数配置的性能
- 验证异常场景的处理
8. **考虑异步处理**:对于非实时性要求高的场景,可以结合异步处理进一步提升性能
- 使用CompletableFuture进行异步合并
- 实现异步回调机制
9. **针对不同场景优化配置**:
- 高频访问接口:小窗口、高合并数量、即时合并
- 复杂业务接口:较大窗口、合理合并数量、滑动窗口
- 限流场景:漏斗算法、合理的窗口大小
10. **参数调优方法**:
- 先使用保守配置(小窗口、低合并数)
- 收集实际运行数据
- 逐步调整优化性能
## 注意事项
1. 请求合并可能会引入一定的延迟,需要在性能和延迟间做权衡
2. 合并的请求应该是可以批量处理的相似请求
3. 对于实时性要求高的请求,不建议使用合并
4. 在生产环境中,建议先进行充分的测试和性能评估
5. 确保批量处理方法的线程安全性
6. 异常处理策略需要明确,设置ignoreException参数时需谨慎
7. 定期检查合并效果,根据业务量变化调整配置参数
## 测试支持
项目已完全迁移至JUnit 5测试框架,包含全面的单元测试和集成测试,确保功能稳定性和代码质量。测试覆盖了:
- HTTP请求合并功能
- Dubbo服务请求合并功能
- 各种合并算法的正确性
- 并发场景下的处理能力
- 异常情况处理
## 性能提升
使用请求合并可以显著减少数据库查询和网络调用次数,在高并发场景下性能提升明显:
- 减少90%以上的重复调用
- 提高系统吞吐量2-5倍
- 降低数据库和服务端负载
- 减少网络连接开销