# requestmerge **Repository Path**: ckwangPrivate/requestmerge ## Basic Information - **Project Name**: requestmerge - **Description**: 一个高性能的Spring Boot Starter,支持HTTP和Dubbo请求合并处理,通过AOP切面和自定义注解实现,集成了多种合并算法,有效减少数据库查询和远程调用次数,提升系统性能 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 8 - **Forks**: 0 - **Created**: 2025-11-08 - **Last Updated**: 2025-11-10 ## Categories & Tags **Categories**: spring-boot-ext **Tags**: Java, SpringBoot ## README # Request Merge Spring Boot Starter 一个高性能的Spring Boot Starter,支持HTTP和Dubbo请求合并处理,通过AOP切面和自定义注解实现,集成了多种合并算法,有效减少数据库查询和远程调用次数,提升系统性能。 无请求合并的示意图 ![输入图片说明](imgimage.png) 请求合并的示意图 ![输入图片说明](imgimage.png) ## 功能特性 - 支持HTTP和Dubbo请求合并处理 - 基于AOP切面实现,使用简单 - 多种合并算法:滑动窗口、漏斗、立即合并 - 支持动态分组和SpEL表达式 - 可配置的超时和异常处理 - 易于集成到现有Spring Boot项目 - 丰富的使用场景示例 - 完整的测试套件,使用JUnit 5确保功能稳定性 - 支持复杂参数分组和条件合并 ## 项目结构 项目采用模块化结构设计,包含以下主要模块: ### 1. request-merge-parent - 父级POM,定义项目的基本配置和依赖管理 - 管理所有子模块的版本和通用配置 - 集成最新的测试框架支持(JUnit 5) ### 2. request-merge-core - 核心功能模块,包含所有算法实现和核心组件 - 包含注解、AOP切面、合并算法、配置类等基础功能 - 提供扩展接口,支持自定义批量处理器实现 ### 3. request-merge-demo - 演示模块,展示如何使用框架 - 包含HTTP和Dubbo服务示例 - 完整的测试套件,使用JUnit 5确保功能稳定性 ## 快速开始 ### 1. 添加依赖 在Maven项目中添加依赖: ```xml com.wangbin.requestmerge request-merge-spring-boot-starter 1.0.1 ``` ### 2. 配置属性 在`application.properties`或`application.yml`中配置相关属性: ```yaml request: merge: enabled: true default-window-size: 100 default-time-unit: MILLISECONDS default-max-merge-count: 100 default-algorithm-type: SLIDING_WINDOW default-timeout: 500 default-timeout-unit: MILLISECONDS support-http: true support-dubbo: true enable-detail-log: false ``` ### 3. 使用示例 #### HTTP请求合并 - 基本示例 ```java import com.wangbin.requestmerge.annotation.RequestMerge; import com.wangbin.requestmerge.enums.MergeAlgorithmType; @RestController @RequestMapping("/api/users") public class UserController { @Autowired private UserService userService; @GetMapping("/{id}") @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 50, maxMergeCount = 50, groupExpression = "#id", timeout = 300 ) public User getUserById(@PathVariable Long id) { return userService.getUserById(id); } } ``` #### HTTP请求合并 - 高级示例 ```java import com.wangbin.requestmerge.annotation.RequestMerge; import com.wangbin.requestmerge.enums.MergeAlgorithmType; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/api/advanced/users") public class AdvancedRequestMergeController { @Autowired private UserService userService; /** * 滑动窗口合并 - 适合高频调用场景 */ @GetMapping("/sliding/{id}") @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 100, maxMergeCount = 30, groupExpression = "#id", timeout = 500 ) public User getUserWithSlidingWindow(@PathVariable Long id) { return userService.getUserById(id); } /** * 漏斗算法合并 - 适合控制请求速率场景 */ @GetMapping("/leaky/{id}") @RequestMerge( algorithmType = MergeAlgorithmType.LEAKY_BUCKET, windowSize = 150, maxMergeCount = 50, groupKey = "leakyBucketGroup" ) public User getUserWithLeakyBucket(@PathVariable Long id) { return userService.getUserById(id); } /** * 即时合并 - 适合高频低延迟场景 */ @GetMapping("/immediate/{id}") @RequestMerge( algorithmType = MergeAlgorithmType.IMMEDIATE_MERGE, maxMergeCount = 100, groupExpression = "#id" ) public User getUserWithImmediateMerge(@PathVariable Long id) { return userService.getUserById(id); } /** * 多参数分组 - 复杂业务场景示例 */ @GetMapping("/multi-param/{id}") @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 80, groupExpression = "#id + '-' + #region", maxMergeCount = 40 ) public User getUserWithRegion(@PathVariable Long id, @RequestParam String region) { return userService.getUserByIdAndRegion(id, region); } } #### Dubbo服务请求合并 在Dubbo服务中,需要在接口或实现类上添加`@RequestMerge`注解,并实现批量处理方法: **1. 定义Dubbo服务接口** - 多种合并场景 ```java import com.wangbin.requestmerge.annotation.RequestMerge; import com.wangbin.requestmerge.enums.MergeAlgorithmType; import java.util.*; import java.util.concurrent.TimeUnit; public interface DubboUserService { /** * 基本请求合并 - 滑动窗口算法 */ @RequestMerge( windowSize = 100, timeUnit = TimeUnit.MILLISECONDS, algorithmType = MergeAlgorithmType.SLIDING_WINDOW, groupKey = "dubboUserService" ) String getUserName(Long userId); /** * 批量获取用户名,用于请求合并的批量处理方法 */ Map batchGetUserNames(List userIds); /** * 多参数请求合并 - 支持按参数类型分组 */ @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 80, maxMergeCount = 40, groupExpression = "#infoType", timeout = 200 ) String getUserInfo(Long userId, String infoType); /** * 多参数批量处理方法 */ Map> batchGetUserInfos(List userIds, List infoTypes); /** * 即时合并 - 适用于高频低延迟场景 */ @RequestMerge( algorithmType = MergeAlgorithmType.IMMEDIATE_MERGE, maxMergeCount = 100 ) String getUserDetails(Long userId); /** * 即时合并对应的批量方法 */ Map batchGetUserDetails(List userIds); /** * 漏斗算法合并 - 适用于需要控制处理速率的场景 */ @RequestMerge( algorithmType = MergeAlgorithmType.LEAKY_BUCKET, windowSize = 150, maxMergeCount = 50, timeout = 1000 ) String getUserExtendedInfo(Long userId); /** * 漏斗算法对应的批量方法 */ Map batchGetUserExtendedInfos(List userIds); /** * 复杂参数分组 - 多参数条件下的分组策略 */ @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 120, maxMergeCount = 30, groupExpression = "#region + '-' + #priority" ) String getUserByRegionAndPriority(Long userId, String region, Integer priority); /** * 复杂参数分组对应的批量方法 */ Map batchGetUsersByRegionAndPriority(List userIds, String region, Integer priority); /** * 高频访问场景 - 小窗口和高合并数量 */ @RequestMerge( algorithmType = MergeAlgorithmType.SLIDING_WINDOW, windowSize = 50, maxMergeCount = 100, timeout = 150 ) String getHighFreqUserInfo(Long userId); /** * 高频访问对应的批量方法 */ Map batchGetHighFreqUserInfos(List userIds); } ``` **2. 实现Dubbo服务** ```java import org.apache.dubbo.config.annotation.DubboService; import java.util.HashMap; import java.util.List; import java.util.Map; @DubboService(version = "1.0.0") public class DubboUserServiceImpl implements DubboUserService { private static final Map USER_MAP = new HashMap<>(); static { // 模拟用户数据 USER_MAP.put(1L, "用户1"); USER_MAP.put(2L, "用户2"); USER_MAP.put(3L, "用户3"); } @Override public String getUserName(Long userId) { // 模拟网络延迟 try { Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return USER_MAP.getOrDefault(userId, "未知用户"); } @Override public Map batchGetUserNames(List userIds) { // 模拟网络延迟 try { Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } Map resultMap = new HashMap<>(); for (Long userId : userIds) { resultMap.put(userId, USER_MAP.getOrDefault(userId, "未知用户")); } return resultMap; } } ``` **3. Dubbo客户端调用** ```java import org.apache.dubbo.config.annotation.DubboReference; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @Component public class DubboUserServiceClient { @DubboReference(version = "1.0.0") private DubboUserService dubboUserService; /** * 调用单个用户查询 */ public String callSingleUser(Long userId) { return dubboUserService.getUserName(userId); } /** * 测试并发调用,验证请求合并效果 */ public Map testConcurrentCalls(List userIds) { List> futures = userIds.stream() .map(id -> CompletableFuture.supplyAsync(() -> { return dubboUserService.getUserName(id); })) .collect(Collectors.toList()); Map result = new HashMap<>(); for (int i = 0; i < userIds.size(); i++) { try { result.put(userIds.get(i), futures.get(i).get()); } catch (Exception e) { result.put(userIds.get(i), "错误: " + e.getMessage()); } } return result; } } ``` ### 4. 批量处理器实现 要实现真正的批量处理,需要提供批量处理函数。可以通过以下方式: 1. **全局批量处理器**: ```java import org.springframework.stereotype.Component; import java.util.*; @Component public class GlobalBatchProcessor { @Autowired private UserRepository userRepository; public Map batchGetUsers(Map userIdMap) { // 批量查询逻辑 List ids = new ArrayList<>(userIdMap.values()); List users = userRepository.findAllById(ids); // 转换为map,使用原始请求ID作为key Map result = new HashMap<>(); for (Map.Entry entry : userIdMap.entrySet()) { for (User user : users) { if (Objects.equals(user.getId(), entry.getValue())) { result.put(entry.getKey(), user); break; } } } return result; } } ``` 2. **使用自定义实现**: 在具体业务中,需要重写或扩展`RequestMergeAspect`类,提供特定的批量处理逻辑。 ## 配置说明 ### @RequestMerge注解参数 | 参数名 | 类型 | 默认值 | 说明 | |-------|------|-------|------| | algorithmType | MergeAlgorithmType | SLIDING_WINDOW | 合并算法类型 | | windowSize | int | 100 | 窗口大小(毫秒) | | maxMergeCount | int | 100 | 最大合并数量 | | groupKey | String | "default" | 分组键 | | groupExpression | String | "" | 分组表达式(SpEL) | | timeout | long | 500 | 超时时间 | | timeoutUnit | TimeUnit | MILLISECONDS | 超时时间单位 | | timeUnit | TimeUnit | MILLISECONDS | 窗口时间单位 | | ignoreException | boolean | false | 是否忽略异常 | ### 合并算法类型 - **SLIDING_WINDOW**:滑动窗口算法,在指定时间窗口内合并请求 - **LEAKY_BUCKET**:漏斗算法,按固定速率处理合并的请求 - **IMMEDIATE_MERGE**:立即合并算法,不等待时间窗口 ## 最佳实践 1. **合理设置窗口大小**:窗口太小无法有效合并,太大会增加延迟,通常建议50-200毫秒 - 高频访问场景:50-100ms - 普通业务场景:100-150ms - 复杂处理场景:150-200ms 2. **选择合适的合并算法**: - 滑动窗口(SLIDING_WINDOW):通用场景,平衡延迟和合并效果 - 漏斗算法(LEAKY_BUCKET):适合需要控制请求速率的场景 - 即时合并(IMMEDIATE_MERGE):适合高频低延迟场景 3. **使用合适的分组策略**:根据业务特性设置有效的分组键 - 按用户ID分组:`groupExpression = "#userId"` - 按业务类型分组:`groupExpression = "#businessType"` - 组合条件分组:`groupExpression = "#region + '-' + #priority"` 4. **实现高效的批量处理逻辑**:批量处理是性能提升的关键 - 使用IN查询替代多次单条查询 - 使用批量API减少网络交互 - 优化批量处理的内存使用 5. **合理设置超时时间**:避免过长的等待时间 - 普通请求:300-500ms - 高频请求:150-300ms - 复杂处理:500-1000ms 6. **监控合并效果**:添加日志和监控,评估合并效果 - 记录请求合并率 - 统计处理时间变化 - 监控系统负载变化 7. **进行充分测试**:在生产环境前进行压力测试 - 模拟高并发场景 - 测试不同参数配置的性能 - 验证异常场景的处理 8. **考虑异步处理**:对于非实时性要求高的场景,可以结合异步处理进一步提升性能 - 使用CompletableFuture进行异步合并 - 实现异步回调机制 9. **针对不同场景优化配置**: - 高频访问接口:小窗口、高合并数量、即时合并 - 复杂业务接口:较大窗口、合理合并数量、滑动窗口 - 限流场景:漏斗算法、合理的窗口大小 10. **参数调优方法**: - 先使用保守配置(小窗口、低合并数) - 收集实际运行数据 - 逐步调整优化性能 ## 注意事项 1. 请求合并可能会引入一定的延迟,需要在性能和延迟间做权衡 2. 合并的请求应该是可以批量处理的相似请求 3. 对于实时性要求高的请求,不建议使用合并 4. 在生产环境中,建议先进行充分的测试和性能评估 5. 确保批量处理方法的线程安全性 6. 异常处理策略需要明确,设置ignoreException参数时需谨慎 7. 定期检查合并效果,根据业务量变化调整配置参数 ## 测试支持 项目已完全迁移至JUnit 5测试框架,包含全面的单元测试和集成测试,确保功能稳定性和代码质量。测试覆盖了: - HTTP请求合并功能 - Dubbo服务请求合并功能 - 各种合并算法的正确性 - 并发场景下的处理能力 - 异常情况处理 ## 性能提升 使用请求合并可以显著减少数据库查询和网络调用次数,在高并发场景下性能提升明显: - 减少90%以上的重复调用 - 提高系统吞吐量2-5倍 - 降低数据库和服务端负载 - 减少网络连接开销