diff --git a/servers/patch_analyzer_mcp/README.md b/servers/patch_analyzer_mcp/README.md new file mode 100644 index 0000000000000000000000000000000000000000..24d578829413e202287ee6b8ab8f9da1e57a5770 --- /dev/null +++ b/servers/patch_analyzer_mcp/README.md @@ -0,0 +1,92 @@ +# patch-analyzer-mcp + +#### 介绍 +补丁回合分析智能体,具有自动拉取上游社区代码生成补丁,分析补丁内容生成excel文件,读取审核后的excel文件按照模块粒度提交到目的仓并创建网页MR合并请求。 + +#### 限制 +目前只支持跟踪一个上游社区代码仓,不支持同时跟踪多个上游仓 + +#### 软件架构 +依赖python的fastmcp,gitpython包 +要求python>=3.10 + + +#### 安装教程 + +1. wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh 下载conda管理工具 +2. conda create --name mcp-server python=3.10 +3. 进入conda环境后安装依赖 pip install fastmcp, gitpython +4. xxxx +5. xxxx + +#### 使用说明 + +mcp server服务端使用方式: +1. 修改src/assistant.conf文件 + +| 参数 | 配置说明 | +|----------------------|----------------------------------------------------| +| kernel_src_url | "kernel"软件的上游代码仓地址 | +| kernel_src_url_proxy | "kernel"软件的上游代码仓代理,影响excel的commit_id列| +| kernel_commitID | "kernel"软件的上游代码仓从指定commitID处开始分析 | +| kernel_src_branch | "kernel"软件的上游代码仓的分支 | +| kernel_dst_branch | "kernel"软件的目的代码仓的分支 | +| kernel_dst_url | "kernel"软件的上游代码仓地址 | +| kernel_project_url | 创建web MR请求的发送地址 | +| kernel_project_token | 创建web MR请求的密钥 | | +| kernel_mr_server | 创建web MR的服务器名(目前支持sangfor和gitee格式的MR)| + +特殊说明:如果要增加openssl软件的配置,复制新增上述字段并将替换"kernel"字段为"openssl"即可,如: +kernel_src_url -》 openssl_src_url +kernel_src_url_proxy -》 openssl_src_url_proxy +依次类推 + +2. python3 patch_assistant 拉起mcp server服务即可,默认会监听0.0.0.0:8100端口 + +--------------------------------------------------------------- + **mcp server客户端使用方式:** + +独立使用agent client客户端使用方式: +1. 修改src/assistant.conf文件 + +| 参数 | 配置说明 | +|----------------------|------------------------------------------------| +| api_key | llm api_key,若无填“EMPTY” | +| base_url | llm base_url | +| model_name | llm 模型名 | +| temperature | llm 模型温度,调节llm输出稳定性和创造性,0.1~0.3输出结果更加稳定 | +| top_p | llm top_p,调节llm输出稳定性,0.7~0.9输出结果更加稳定 | +| mcp_server_ip | mcp server服务端ip | +| mcp_server_port | mcp server服务端port | +| patch_excel_gen_path | 补丁分析结果excel生成的路径,文件名会是 软件-时间戳.xlsx | +| patch_excel_path | 补丁回合导入的excel文件完整路径信息 | +| sse_read_timeout | sse服务端无数据响应超时时间,单位:s,根据服务端最长同步代码块运行时长设置,可以适当调大 | + +按照本地llm模型或者线上llm模型设置模型相关参数,其他按照本地mcp server服务端,本地磁盘路径设置; +2. python client/mcp_client.py 拉起进程,根据程序提示交互输入执行的执行,agent按照software_list提供的列表提示选择软件进行操作,支持 **“软件补丁分析”、“补丁回合”** 两个指令; +3. 完成“软件补丁分析”后,将输出excel供架构师人工审核,需要回填excel中“确认合入”(仅可填 “ **是/否** ” )、“确认理由”,此两项为必填项,若空缺将导致补丁回合步骤失败; +4. 执行“补丁回合”前需要上传评审后的excel文件,按照步骤2执行,等待agent返回结果,若出现补丁冲突问题,需要人工检视修复; + + +客户端接入roo code使用方式: +1. 同样的方式修改client/client.conf文件; +2. python3 patch_assistant.py 拉起mcp server服务即可,默认会监听0.0.0.0:8100端口 +2. 配置client/mcp_config.json,在roo code里监听客户端8100端口; + +``` +{ + "mcpServers": { #标签名,固定 + "patch_analyse": { # mcp server名称 + "type": "streamable-http", # 连接mcp方式,固定 + "url": "http://0.0.0.0:8100/mcp", #mcp 客户端 url + "disabled": false, + "timeout": 3600, # 超时时间 + "alwaysAllow": [ + ] # 常开的接口,置空就可以 + } + } +} +``` + +3. 配置roo code中自定义prompt,看情况根据llm效果来; +4. roo code中进行问答; diff --git a/servers/patch_analyzer_mcp/mcp-rpm.yaml b/servers/patch_analyzer_mcp/mcp-rpm.yaml new file mode 100644 index 0000000000000000000000000000000000000000..3f907f0699ebfed63027354162e62874dc33295c --- /dev/null +++ b/servers/patch_analyzer_mcp/mcp-rpm.yaml @@ -0,0 +1,22 @@ +name: "patch_analyzer_mcp" +summary: "Package dependency analyzer for RPM/DNF/PIP" +description: | + A MCP server that provides tools to analyze package dependencies + for RPM, DNF and PIP packages, including dependency tree generation. + +dependencies: + system: + - python3 + - rpm + - dnf + - python3-pip + packages: + - fastmcp + - gitpython + +files: + required: + - mcp_config.json + - src/patch_assistant.py + optional: + - src/requirements.txt \ No newline at end of file diff --git a/servers/patch_analyzer_mcp/mcp_config.json b/servers/patch_analyzer_mcp/mcp_config.json new file mode 100644 index 0000000000000000000000000000000000000000..2719a10c12f3c08bf80ff7c1eda582d950400a65 --- /dev/null +++ b/servers/patch_analyzer_mcp/mcp_config.json @@ -0,0 +1,12 @@ +{ + "mcpServers": { + "patch_analyse": { + "type": "streamable-http", + "url": "http://0.0.0.0:8100/mcp", + "disabled": false, + "timeout": 3600, + "alwaysAllow": [ + ] + } + } +} diff --git a/servers/patch_analyzer_mcp/src/README.md b/servers/patch_analyzer_mcp/src/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8f6e104d3143787c8d75c46a4f0c4f33649b425e --- /dev/null +++ b/servers/patch_analyzer_mcp/src/README.md @@ -0,0 +1,54 @@ +单http部署: rooCode(IDE)--http--patch_assistant(:8100) + +#### 使用说明 +1.修改配置文件assistant.conf配置项目 + +| 参数 | 配置说明 | +|----------------------|----------------------------------------------------| +| kernel_src_url | "kernel"软件的上游代码仓地址 | +| kernel_src_url_proxy | "kernel"软件的上游代码仓代理,影响excel的commit_id列| +| kernel_commitID | "kernel"软件的上游代码仓从指定commitID处开始分析 | +| kernel_src_branch | "kernel"软件的上游代码仓的分支 | +| kernel_dst_branch | "kernel"软件的目的代码仓的分支 | +| kernel_dst_url | "kernel"软件的上游代码仓地址 | +| kernel_project_url | 创建web MR请求的发送地址 | +| kernel_project_token | 创建web MR请求的密钥 | | +| kernel_mr_server | 创建web MR的服务器名(目前支持sangfor和gitee格式的MR)| +| api_key | llm api_key,若无填“EMPTY” | +| base_url | llm base_url | +| model_name | llm 模型名 | +| temperature | llm 模型温度,调节llm输出稳定性和创造性,0.1~0.3输出结果更加稳定 | +| top_p | llm top_p,调节llm输出稳定性,0.7~0.9输出结果更加稳定 | +| timeout | llm 模型调用超时时间 | +| max_workers | 调用llm的并发数 | +| patch_excel_gen_path | 补丁分析结果excel生成的路径,文件名会是 软件-时间戳.xlsx | +| judge_rules | 自定义的检视规则 | + +特殊说明:如果要增加openssl软件的配置,复制新增上述字段并将替换"kernel"字段为"openssl"即可,如: +kernel_src_url -》 openssl_src_url +kernel_src_url_proxy -》 openssl_src_url_proxy +依次类推 + +2.python3 patch_assistant.py启动服务,默认监听:8100端口 + +3.客户端接入roo code +``` +{ + "mcpServers": { #标签名,固定 + "patch_analyse": { # mcp server名称 + "type": "streamable-http", # 连接mcp方式,固定 + "url": "http://0.0.0.0:8100/mcp", #mcp 客户端 url + "headers": { + "Authorization": "Bearer sk-1234" #用户独立的token,配置这个值可以隔离用户间的参数 + }, + "disabled": false, + "timeout": 3600, # 超时时间 + "alwaysAllow": [ + ] # 常开的接口,置空就可以 + } + } +} +``` + +4. 配置roo code中自定义prompt,看情况根据llm效果来; +5. roo code中进行问答; diff --git a/servers/patch_analyzer_mcp/src/assistant.conf b/servers/patch_analyzer_mcp/src/assistant.conf new file mode 100644 index 0000000000000000000000000000000000000000..9dc9504ea850c775233e186038e31173b91b7c07 --- /dev/null +++ b/servers/patch_analyzer_mcp/src/assistant.conf @@ -0,0 +1,36 @@ +# tools配置 +kernel_src_url=https://gitee.com/openeuler/kernel.git + +kernel_src_url_proxy=https://proxy/openeuler/kernel.git + +kernel_commitID=860874bd1cca5142eb23d123a0aeac7ec9d73d75 + +kernel_last_commitID=7ccb8a2b6758a5711040e0d3f6152e4ffd3ebb35 + +kernel_src_branch=openEuler-25.03 + +kernel_dst_branch=personal/openEuler-25.03 + +kernel_dst_url=git@gitee.com:pandongang/kernel.git + +kernel_project_url=https://gitee.com/api/v5/repos/pandongang/kernel/pulls + +kernel_project_token=99ca60a1a66d62067838e70b5295bdac + +kernel_mr_server=gitee + +#llm参数 +api_key=sk-xxx +base_url=https://dashscope.aliyuncs.com/compatible-mode/v1 +model_name=qwen3-max-2025-09-23 +temperature=0.2 +top_p=0.8 +timeout=300 +max_workers=8 + +#excel信息 +#补丁分析excel输出路径 +patch_excel_gen_path=/tmp + +#代码分析回合规则 +judge_rules="1、是否是安全性修复,如果补丁修复了安全漏洞,通常需要回合,例如CVE漏洞、权限绕过、远程代码执行;2、严重缺陷修复,如果解决了系统崩溃、数据丢失、功能不可用的bug修复,通常需要回合;3、功能性增强或新特性,一般不回合;4、性能优化,一般不回合;5、本补丁修改自身不满足1、2回合条件,但是属于一个整体系列补丁中一环,暂时评估为需要回合,由人工最终评审,但是需要在判断理由中强调原因;6、如果存在前置补丁或后置补丁依赖,需要列出对应依赖补丁commit id" diff --git a/servers/patch_analyzer_mcp/src/mcp_config.json b/servers/patch_analyzer_mcp/src/mcp_config.json new file mode 100644 index 0000000000000000000000000000000000000000..1b8a41ef57a353ecba1c2f5f6e76d3b90d50085c --- /dev/null +++ b/servers/patch_analyzer_mcp/src/mcp_config.json @@ -0,0 +1,15 @@ +{ + "mcpServers": { + "patch_analyse": { + "type": "streamable-http", + "url": "http://0.0.0.0:8100/mcp", + "headers": { + "Authorization": "Bearer sk-1234" + }, + "disabled": false, + "timeout": 3600, + "alwaysAllow": [ + ] + } + } +} diff --git a/servers/patch_analyzer_mcp/src/patch_assistant.py b/servers/patch_analyzer_mcp/src/patch_assistant.py new file mode 100644 index 0000000000000000000000000000000000000000..b040707bbdd9187e715291932ca8c7a5956ea58f --- /dev/null +++ b/servers/patch_analyzer_mcp/src/patch_assistant.py @@ -0,0 +1,600 @@ +from datetime import datetime +import asyncio +from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED +import contextvars +import csv +import json +import os +import threading +import time +import re +from typing import Any, Dict, List, Optional + +from cachetools import LRUCache +from fastmcp import Client +from fastmcp.client.transports import StreamableHttpTransport +from mcp.server.fastmcp import FastMCP +from openai import OpenAI +from openai import BadRequestError +import openai +import pandas as pd +from pydantic import BaseModel, Field +from queue import Queue +import tools + +# 创建FastMCP实例,http方式 +mcp = FastMCP(instructions="生成补丁、解析补丁、回合补丁", host="0.0.0.0", port=8100) + +user_config_cache = LRUCache(maxsize=10000) +default_config ={} +client_config = {} +# 配置OpenAI客户端 openai v1.100.1 +client = OpenAI( + api_key="xxxxx", + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" +) + +class CSVWriterService: + """ + 分析结果逐步固化到csv中间文件 + """ + def __init__(self, filename, interval=300): + self.filename = filename + self.interval = interval + self._queue = Queue() + self._lock = threading.Lock() + self._start_writer_thread() + self._init_file() + + def _init_file(self): + if not os.path.exists(self.filename): + with open(self.filename, mode='w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + writer.writerow(["提交信息","提交时间","模块","改动说明","判断理由","合入策略","确认合入","确认理由","提交标题","补丁类型","commit_url","提交描述","差异","patch名"]) + + def _start_writer_thread(self): + def loop(): + while True: + self._flush_to_csv() + time.sleep(self.interval) + threading.Thread(target=loop, daemon=True).start() + + def _flush_to_csv(self): + batch = [] + while not self._queue.empty(): + batch.append(self._queue.get()) + + if not batch: + return + + with self._lock: + with open(self.filename, mode='a', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + for item in batch: + writer.writerow([item["提交信息"],item["提交时间"],item["模块"],item["改动说明"],item["判断理由"],item["合入策略"],item["确认合入"],item["确认理由"],item["提交标题"],item["补丁类型"],item["commit_url"],item["提交描述"],item["差异"],item["patch名"]]) + + def add_data(self, data): + self._queue.put(data) + + +def check_json_output(output: str) -> str: + """判断llm是否包含标签,检查json格式完整性""" + cleaned_output = re.sub(r".*?", "", output, flags=re.DOTALL) + cleaned_output = cleaned_output.lstrip() # 去除多余段首空白 + + # 检查剩余内容是否为JSON格式 + try: + llm_data = json.loads(cleaned_output) + return llm_data + except json.JSONDecodeError: + raise json.JSONDecodeError(f"数据不满足json格式", cleaned_output, 0) + + +def estimate_tokens(text: str) -> int: + """ + 粗略估算长文本转换token后长度,中文按照1token平均1.5字符,英文1token平均4字符 + """ + char_count = len(text) + token_len = 0 + zh_char = 0 + en_char = 0 + for char in text: + if '\u4e00' <= char <= '\u9fff': + zh_char += 1 + else: + en_char += 1 + token_len = int(zh_char / 1.5 + en_char / 4) + return token_len + +def call_vllm(service: CSVWriterService, software_name: str, system_prompt: str, query:str, idx: int): + patch_content = tools.read_patch(software_name, idx) + content = patch_content.get("提交描述", None) + diff = patch_content.get("差异", None) + patch_content["确认合入"] = "" + patch_content["确认理由"] = "" + query += json.dumps(content, ensure_ascii=False, indent=2) + diff_token = estimate_tokens(json.dumps(diff, ensure_ascii=False, indent=2)) + if diff_token < 8192: + query += json.dumps(diff, ensure_ascii=False, indent=2) + + #llm结果不满足预期将重试三次 + max_retries = 3 + retry_count = 0 + terminated = False + final_content = {} + final_content.update(patch_content) + while retry_count < max_retries and not terminated: + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": query} + ] + + try: + # 调用OpenAI模型 + response = client.chat.completions.create( + model=client_config.get("model_name"), + messages=messages, + temperature=float(client_config.get("temperature")), + top_p=float(client_config.get("top_p")), + timeout=int(client_config.get("timeout")) + ) + response_message = response.choices[0].message.content + except BadRequestError as e: + print(f"补丁{idx} llm分析报错: {e},正在进行第{retry_count}次重试...") + retry_count += 1 + continue + except Exception as e: + print(f"补丁{idx} llm分析报错: {e},正在进行第{retry_count}次重试...") + retry_count += 1 + continue + + try: + llm_data = check_json_output(response_message) + final_content.update(llm_data) + service.add_data(final_content) + print(f"分析补丁{idx}已完成") + terminated = True + except json.JSONDecodeError: + retry_count += 1 + print(f"补丁{idx}分析输出格式不正确,正在进行第{retry_count}次重试...") + + query += f",你输出的内容是:{response_message},格式不符合要求。\n" \ + f"请严格按照要求格式重新生成,确保输出是有效的纯JSON内容。" + + if not terminated: + service.add_data(final_content) + print(f"补丁{idx}经过{max_retries}次重试后仍无法解析,跳过这条数据分析") + + +def gen_patch_content(service: CSVWriterService, software_name: str, system_prompt: str, query: str, commit_id: str): + """分析补丁内容""" + patch_count = tools.get_patch_count(software_name, commit_id) + print(f"补丁总数: {patch_count},开始逐条分析") + + with ThreadPoolExecutor(max_workers=int(client_config.get("max_workers"))) as executor: + idx = 1 + futures = set() + + while idx <= patch_count or futures: + while len(futures) < int(client_config.get("max_workers")) and idx <= patch_count: + future = executor.submit(call_vllm, service, software_name, system_prompt, query, idx) + futures.add(future) + idx += 1 + + if futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + + # 处理已完成的任务 + for future in done: + try: + future.result(timeout=60) + except Exception as e: + print(f"分析补丁失败: {str(e)}") + + service._flush_to_csv() + print(f"所有补丁已分析完成!") + + + +def process_with_flow(software_name: str, patch_content: List[Dict]) -> str: + """补丁内容超长时由工作流驱动触发补丁回合动作""" + try: + result = tools.apply_patch(software_name, patch_content) + except Exception as e: + result = f"处理补丁时发生错误:{str(e)}" + + return result + + +def excel_to_json(excel_path: str, sheet_name: str = 0) -> List[Dict]: + """ + 将Excel文件转换为JSON格式(每行作为标题,每列作为数据条目) + + 参数: + excel_path: Excel文件路径 + sheet_name: 工作表名称或索引(默认0,即第一个工作表) + + 返回: + 转换后的JSON数据列表 + """ + try: + # 读取Excel文件,第一行为标题行 + df = pd.read_excel( + excel_path, + sheet_name=sheet_name, + header=0, + index_col=False + ) + + # 检查数据是否为空 + if df.empty: + raise ValueError("Excel文件中没有有效数据") + + # 转换为JSON格式列表(每条记录对应一行数据) + json_data = df.to_dict(orient="records") + # 删除'提交描述'和'差异'字段,占用token影响上下文长度 + fields_to_remove = ['提交描述', '差异'] + for item in json_data: + for field in fields_to_remove: + # 若字段存在则删除,避免KeyError + if field in item: + del item[field] + return json_data + + except FileNotFoundError: + raise FileNotFoundError(f"未找到Excel文件: {excel_path}") + except Exception as e: + raise Exception(f"转换失败: {str(e)}") + + +def load_config(config_filename: str = "assistant.conf"): + """ + 读取配置文件 + 参数: + config_filename: 配置文件名(默认"config.txt") + 返回: + 配置字典(key: 配置项名, value: 配置值(字符串类型)) + """ + script_path = os.path.abspath(__file__) + script_dir = os.path.dirname(script_path) + config_file_path = os.path.join(script_dir, config_filename) + try: + with open(config_file_path, "r", encoding="utf-8") as f: + for line_num, line in enumerate(f, start=1): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + print(f"警告:配置文件第{line_num}行格式错误(缺少'='),已跳过该行:{line}") + continue + key, value = line.split("=", 1) + client_config[key.strip()] = value.strip() + + global client + client = OpenAI( + api_key=client_config.get("api_key"), + base_url=client_config.get("base_url") + ) + + global default_config + default_config = { + "patch_excel_gen_path": client_config.get("patch_excel_gen_path"), + "judge_rules": client_config.get("judge_rules") + } + user_config_cache["root"] = default_config + return + + except FileNotFoundError: + raise FileNotFoundError(f"未找到配置文件:{config_file_path}") + except Exception as e: + raise Exception(f"读取配置文件失败:{str(e)}") + + +def get_current_auth() -> str: + ctx = mcp.get_context() + request_context = ctx.request_context + token = request_context.request.headers.get("authorization") + if not token: + token = "root" + + return token + + +@mcp.tool() +def set_client_config(config_key: str, config_value: str): + """ + 设定指定配置参数的值,可以设置 + patch_excel_gen_path:补丁分析结果excel文件输出的根路径; + judge_rules:评审补丁回合规则 + 参数: + config_key: 键(必须为字符串) + config_value: 值(支持任意类型) + 返回: 设置值结果是否成功 + """ + if not isinstance(config_key, str): + print(f"键必须是字符串类型,当前传入 {type(config_key)}") + return { + "status": "done", + "message": "设置失败,键类型不匹配" + } + token = get_current_auth() + ctx = user_config_cache.get(token, {}) + if not ctx: + global default_config + new_ctx = default_config.copy() + new_ctx[config_key] = config_value + user_config_cache[token] = new_ctx + return { + "status": "done", + "message": "设置成功" + } + new_ctx = ctx.copy() + new_ctx[config_key] = config_value + user_config_cache[token] = new_ctx + + return { + "status": "done", + "message": "设置成功" + } + + +@mcp.tool() +def get_client_config(config_key: str): + """ + 获取系统预设或者用户更新后指定配置参数的值,可以获取到 + patch_excel_gen_path:补丁分析结果excel文件输出的根路径; + judge_rules:评审补丁回合规则 + 参数: + config_key: 要查询配置的键(必须为字符串) + 返回: 查询成功返回键对应的值,失败返回错误信息 + """ + if not isinstance(config_key, str): + print(f"键必须是字符串类型,当前传入 {type(config_key)}") + return { + "status": "done", + "message": "查询失败,键类型不匹配" + } + token = get_current_auth() + ctx = user_config_cache.get(token, {}) + if not ctx: + global default_config + value = default_config.get(config_key, None) + return { + "status": "done", + "message": value + } + value = ctx.get(config_key, None) + + return { + "status": "done", + "message": value + } + + +@mcp.tool() +def analyse_software_patch(software_name: str, commit_id: str): + """ + 通过用户输入的软件名、commit_id分析软件历史补丁详细内容,结果生成excel分析文档 + + 规则: + 需要提前设置的变量-> patch_excel_gen_path补丁分析结果excel文件输出的根路径; + + 参数: + software_name: 需要分析的软件名 + commit_id: 需要分析的软件起始补丁对应的提交信息commit id + + 返回: + 分析结果 + """ + + print("收到用户输入,接下来执行软件漏洞补丁分析...") + token = get_current_auth() + ctx = user_config_cache.get(token, {}) + judge_rules = ctx.get("judge_rules", None) + if judge_rules is None: + judge_rules = client_config.get("judge_rules") + system_prompt = f'''你是一个AI辅助补丁分析工具,可以结合提供的工具来帮助回答用户的问题。 + 请根据问题判断是否需要调用工具,如果需要,请选择合适的工具并正确指定参数。 + 工具调用结果返回后,请结合结果逐项分析给出最终回答,分析结果按照要求返回,所有的结果都是有用的,请不要遗漏。 + 如果不需要调用工具,可以直接回答问题。 + 你需要遵守以下规则: + 1、分析结果需要完整,不可以遗漏。 + 2、你给出的最终结果将按照纯json格式被解析,返回结果请严格按照以下json格式,不要包含任何格式(例如MarkDown```json),也不要添加解释。 + 以下是补丁分析举例: + 补丁分析输入:"From 860874bd1cca5142eb23d123a0aeac7ec9d73d75 Mon Sep 17 00:00:00 2001\nFrom: zhaolichang \n + Date: Fri, 21 Mar 2025 01:08:48 +0800\nSubject: [PATCH 1/4] PINCTRL: Fix the issue that CONFIG_PINCTRL_AMD do not\n + support m option\n\nhuawei inclusion\ncategory: bugfix\nbugzilla: https://gitee.com/src-openeuler/calamares/issues/IBS0LG\n + CVE: NA\n\nFix the issue that the CONFIG_PINCTRL_ADM configuration do not\n + support the 'm' (module) option, and change it to 'y' (built-in).\n\nFixes: cbba3eb02aa9 ("PINCTRL:ENABLE_CONFIG_PINCTRL_AMD")\n + Signed-off-by: zhaolichang \n---\n arch/arm64/configs/openeuler_defconfig | 1 +-\n + 1 files changed, 1 insertions(+), 1 deletions(-)\n\ndiff --git a/arch/arm64/configs/openeuler_defconfig b/arch/arm64/configs/openeuler_defconfig\n + index 531f5f04d8e8..39729694001b 100644\n--- a/arch/arm64/configs/openeuler_defconfig\n+++ b/arch/arm64/configs/openeuler_defconfig\n + @@ -3945,7 +3945,7 @@ CONFIG_PINMUX=y\n CONFIG_PINCONF=y\n CONFIG_GENERIC_PINCONF=y\n # CONFIG_DEBUG_PINCTRL is not set\n + -CONFIG_PINCTRL_AMD=m\n+CONFIG_PINCTRL_AMD=y\n # CONFIG_PINCTRL_CY8C95X0 is not set\n # CONFIG_PINCTRL_MCP23S08 is not set\n + # CONFIG_PINCTRL_MICROCHIP_SGPIO is not set\n--\n2.43.0" + 结果输出格式为: + {{ + "提交信息": "860874bd1cca5142eb23d123a0aeac7ec9d73d75", + "提交时间": "Fri, 21 Mar 2025 01:08:48 +0800", + "模块": "PINCTRL", + "改动说明": "修复 CONFIG_PINCTRL_AMD 配置不支持模块('m')选项的问题,将其修改为内置('y')方式", + "判断理由": "xxxx", + "合入策略": "是", + "提交标题": "PINCTRL: Fix the issue that CONFIG_PINCTRL_AMD do not support m option", + "补丁类型": "bugfix" + }} + 参数解释: + 提交信息--补丁信息中对应的补丁编号; + 提交时间--commit修改时间; + 模块--补丁修改涉及的文件所属模块,通常在提交标题中会显示; + 改动说明--补丁信息中对补丁修改点的描述,使用中文生成回答,计算机相关专有名词使用英文表述; + 判断理由--基于以下几点分析:{judge_rules}; + 合入策略--是/否,根据“判断理由”设置是否需要回合; + 提交标题--commit的标题,从补丁信息中提取,格式为“模块:修改内容”; + 补丁类型--根据合入策略分析结果代码的改动属于哪一类型; + ''' + + user_query = "帮我分析" + software_name + "的补丁,最终结果以json格式输出,补丁内容如下: " + + try: + now = datetime.now() + + patch_excel_gen_path = ctx.get("patch_excel_gen_path", None) + if patch_excel_gen_path is None: + patch_excel_gen_path = client_config.get("patch_excel_gen_path") + csv_file = patch_excel_gen_path + "/" + software_name + "-" + now.strftime("%Y%m%d%H%M%S") + ".csv" + service = CSVWriterService(csv_file, 300) + gen_patch_content(service, software_name, system_prompt, user_query, commit_id) + df = pd.read_csv(csv_file) + excel_file = patch_excel_gen_path + "/" + software_name + "-" + now.strftime("%Y%m%d%H%M%S") + ".xlsx" + df.to_excel(excel_file, index=False) + result = f"补丁分析文件已生成到:{excel_file}" + except Exception as e: + result = f"处理补丁时发生错误:{str(e)}" + + return { + "status": "done", + "message": result + } + + +@mcp.tool() +def apply_software_patch(software_name: str, patch_excel_path: str): + """ + 通过用户输入的软件名、excel评审文档路径,解析excel,回合软件补丁 + + 参数: + software_name: 需要分析的软件名 + commit_id: 软件补丁对应的提交信息commit id + + 返回: + 回合补丁是否成功 + """ + + print("收到用户输入,接下来执行软件patch回合...") + patch_list = excel_to_json(patch_excel_path) + filter_list = [item for item in patch_list if item.get("确认合入") == "是"] + if not filter_list: + return f"{patch_excel_path}文件导入后'确认合入'的补丁为空,请确认" + result = process_with_flow(software_name, filter_list) + + final_result = f"补丁回合处理结果:{result}" + return { + "status": "done", + "message": final_result + } + + +def get_patch_name_by_commit_id(json_data, commit_id) -> str: + """ + 从excel数据中根据commit id查找对应的patch name值,并处理文件名格式 + """ + for item in json_data: + if '提交信息' in item and 'patch名' in item: + if item['提交信息'] == commit_id: + patch_name = item['patch名'] + # 按第一个"-"进行截断 + if '-' in patch_name: + prefix_part = patch_name.split('-', 1)[0] + return prefix_part + + # 未找到匹配项时返回空字符串 + return "" + + +@mcp.tool() +def re_analyse_patch(software_name: str, commit_id: str, patch_excel_path: str): + """ + 通过用户输入的软件名、excel评审文档路径,指定的commit_id按照新规则重新分析单个commit id对应的补丁内容 + + 规则: + 需要提前设置的变量-> judge_rules: 用户自定义的补丁评审规则 + + 参数: + software_name: 需要分析的软件名 + commit_id: 软件补丁对应的提交信息commit id + patch_excel_path: 上一次完整补丁分析报告输出路径 + + 返回: + 单个补丁的分析内容 + """ + print("收到用户输入,接下来执行单条补丁重分析...") + patch_list = excel_to_json(patch_excel_path) + patch_idx = get_patch_name_by_commit_id(patch_list, commit_id) + if patch_idx == "": + return f"分析原文件{patch_excel_path}中找不到{commit_id}数据,请检查" + token = get_current_auth() + ctx = user_config_cache.get(token, {}) + judge_rules = ctx.get("judge_rules", None) + if judge_rules is None: + judge_rules = client_config.get("judge_rules") + system_prompt = f'''你是一个AI辅助补丁分析工具,可以结合提供的工具来帮助回答用户的问题。 + 请根据问题判断是否需要调用工具,如果需要,请选择合适的工具并正确指定参数。 + 工具调用结果返回后,请结合结果逐项分析给出最终回答,分析结果按照要求返回,所有的结果都是有用的,请不要遗漏。 + 如果不需要调用工具,可以直接回答问题。 + 你需要遵守以下规则: + 1、分析结果需要完整,不可以遗漏。 + 2、你给出的最终结果将按照纯json格式被解析,返回结果请严格按照以下json格式,不要包含任何格式(例如MarkDown```json),也不要添加解释。 + 以下是补丁分析举例: + 补丁分析输入:"From 860874bd1cca5142eb23d123a0aeac7ec9d73d75 Mon Sep 17 00:00:00 2001\nFrom: zhaolichang \n + Date: Fri, 21 Mar 2025 01:08:48 +0800\nSubject: [PATCH 1/4] PINCTRL: Fix the issue that CONFIG_PINCTRL_AMD do not\n + support m option\n\nhuawei inclusion\ncategory: bugfix\nbugzilla: https://gitee.com/src-openeuler/calamares/issues/IBS0LG\n + CVE: NA\n\nFix the issue that the CONFIG_PINCTRL_ADM configuration do not\n + support the 'm' (module) option, and change it to 'y' (built-in).\n\nFixes: cbba3eb02aa9 ("PINCTRL:ENABLE_CONFIG_PINCTRL_AMD")\n + Signed-off-by: zhaolichang \n---\n arch/arm64/configs/openeuler_defconfig | 1 +-\n + 1 files changed, 1 insertions(+), 1 deletions(-)\n\ndiff --git a/arch/arm64/configs/openeuler_defconfig b/arch/arm64/configs/openeuler_defconfig\n + index 531f5f04d8e8..39729694001b 100644\n--- a/arch/arm64/configs/openeuler_defconfig\n+++ b/arch/arm64/configs/openeuler_defconfig\n + @@ -3945,7 +3945,7 @@ CONFIG_PINMUX=y\n CONFIG_PINCONF=y\n CONFIG_GENERIC_PINCONF=y\n # CONFIG_DEBUG_PINCTRL is not set\n + -CONFIG_PINCTRL_AMD=m\n+CONFIG_PINCTRL_AMD=y\n # CONFIG_PINCTRL_CY8C95X0 is not set\n # CONFIG_PINCTRL_MCP23S08 is not set\n + # CONFIG_PINCTRL_MICROCHIP_SGPIO is not set\n--\n2.43.0" + 结果输出格式为: + {{ + "提交信息": "860874bd1cca5142eb23d123a0aeac7ec9d73d75", + "模块": "PINCTRL", + "改动说明": "修复 CONFIG_PINCTRL_AMD 配置不支持模块('m')选项的问题,将其修改为内置('y')方式", + "判断理由": "xxxx", + "合入策略": "是" + }} + 参数解释: + 提交信息--补丁信息中对应的补丁编号; + 改动说明--补丁信息中对补丁修改点的描述,使用中文生成回答,计算机相关专有名词使用英文表述; + 判断理由--基于以下几点分析:{judge_rules}; + 合入策略--是/否,根据“判断理由”设置是否需要回合; + ''' + + user_query = "帮我分析" + software_name + "的补丁,最终结果以json格式输出,补丁内容如下: " + try: + patch_content = tools.read_patch(software_name, int(patch_idx)) + content = patch_content.pop("提交描述", None) + user_query += json.dumps(content, ensure_ascii=False, indent=2) + diff = patch_content.pop("差异", None) + diff_token = estimate_tokens(json.dumps(diff, ensure_ascii=False, indent=2)) + if diff_token < 8192: + user_query += json.dumps(diff, ensure_ascii=False, indent=2) + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_query} + ] + + # 调用OpenAI模型 + response = client.chat.completions.create( + model=client_config.get("model_name"), + messages=messages, + temperature=float(client_config.get("temperature")), + top_p=float(client_config.get("top_p")), + timeout=int(client_config.get("timeout")) + ) + response_message = response.choices[0].message.content + result = response_message + except Exception as e: + result = f"处理补丁分析时发生错误:{str(e)}" + + return { + "status": "done", + "message": result + } + +if __name__ == "__main__": + load_config() + mcp.run("streamable-http") diff --git a/servers/patch_analyzer_mcp/src/tools.py b/servers/patch_analyzer_mcp/src/tools.py new file mode 100644 index 0000000000000000000000000000000000000000..374bc035218cd7279165448f2ead45bf3461db9c --- /dev/null +++ b/servers/patch_analyzer_mcp/src/tools.py @@ -0,0 +1,463 @@ +import os +import glob +import re +import subprocess +import json +import requests +from typing import Optional, Dict, List, Any +from git import Repo +from pathlib import Path + +def parse_config_line(line: str) -> Optional[tuple[str, str]]: + """解析配置文件中的一行,提取键值对""" + line = line.strip() + if not line or line.startswith('#'): # 跳过空行和注释 + return None + + if '=' in line: + key, value = line.split('=', 1) + return key.strip(), value.strip() + return None + +def read_config_values(file_path: str) -> Dict[str, str]: + """读取配置文件中的所有键值对""" + config = {} + try: + if not os.path.exists(file_path): + print(f"错误: 文件 '{file_path}' 不存在") + return config + + with open(file_path, 'r', encoding='utf-8') as file: + for line_num, line in enumerate(file, 1): + parsed = parse_config_line(line) + if parsed: + key, value = parsed + config[key] = value + except Exception as e: + print(f"读取文件时出错: {e}") + return config + +def get_value_by_key(file_path: str, key: str) -> Optional[str]: + """根据键获取配置文件中的值""" + config = read_config_values(file_path) + return config.get(key) + +def get_config(app_name: str): + config_file = os.path.dirname(__file__) + '/assistant.conf' + src_url = get_value_by_key(config_file, app_name + "_src_url") + src_url_proxy = get_value_by_key(config_file, app_name + "_src_url_proxy") + commitID = get_value_by_key(config_file, app_name + "_commitID") + src_branch_name = get_value_by_key(config_file, app_name + "_src_branch") + dst_branch_name = get_value_by_key(config_file, app_name + "_dst_branch") + dst_url = get_value_by_key(config_file, app_name + "_dst_url") + project_url = get_value_by_key(config_file, app_name + "_project_url") + project_token = get_value_by_key(config_file, app_name + "_project_token") + mr_server = get_value_by_key(config_file, app_name + "_mr_server") + + return src_url, src_url_proxy, commitID, src_branch_name, dst_branch_name, dst_url, project_url, project_token, mr_server + +def save_config(key, new_value): + """ + 修改配置文件中指定key对应的value值 + """ + config_file = os.path.dirname(__file__) + '/assistant.conf' + try: + # 读取文件内容 + with open(config_file, 'r') as file: + lines = file.readlines() + + # 查找并修改目标行 + modified = False + with open(config_file, 'w') as file: + for line in lines: + if line.startswith(f"{key}="): + file.write(f"{key}={new_value}\n") + modified = True + else: + file.write(line) + + if not modified: + with open(config_file, 'a') as file: + file.write(f"{key}={new_value}\n") + print(f"未找到键 '{key}',已添加新的键值对") + else: + print(f"成功将键 '{key}' 的值修改为 '{new_value}'") + + except FileNotFoundError: + print(f"错误:文件 '{config_file}' 不存在") + except Exception as e: + print(f"修改文件时发生错误:{str(e)}") + +def generate_app_patches(app_name: str, commit_id: str, save_path: str) -> str: + """ + 拉取代码仓指定分支的最新代码并从指定的commitID开始生成所有patch。 + """ + src_url, _, commitID, src_branch_name, _, _, _, _, _ = get_config(app_name) + if commit_id: # 如果用户指定了commit_id则以用户输入的值为基线 + commitID = commit_id + + local_repo_path = "/tmp/" + app_name + "-src" + try: + # 1. 克隆或更新仓库 + if not subprocess.run(["test", "-d", f"{local_repo_path}/.git"], capture_output=True).returncode == 0: + # 如果目录不存在或不是git仓库,则克隆 + subprocess.run(["git", "clone", src_url, local_repo_path], check=True) + else: + # 如果是git仓库,则拉取最新代码 + subprocess.run(["git", "-C", local_repo_path, "fetch", "origin"], check=True) + + # 下载代码或更新代码后都走一边切换代码分支逻辑 + subprocess.run(["git", "-C", local_repo_path, "checkout", src_branch_name], check=True) + subprocess.run(["git", "-C", local_repo_path, "pull", "origin", src_branch_name], check=True) + + # 2. 生成patch + cmd = ["git", "-C", local_repo_path, "format-patch"] + if commitID: + cmd.append(f"{commitID}..HEAD") + else: + cmd.append("--all") # 从第一个commit开始 + + cmd.extend(["--output-directory", save_path]) + subprocess.run(cmd, check=True, capture_output=True, text=True) + + # 3. 获取最新的commitID并写入文件 + result = subprocess.run(["git", "-C", local_repo_path, "rev-parse", "HEAD"], + stdout=subprocess.PIPE, text=True, check=True) + last_commit_id = result.stdout.strip() + save_config(app_name + "_last_commitID", last_commit_id) + + return f"成功从commit '{commitID or '第一个commit'}' 开始生成patch文件到 '{save_path}' 目录。" + + except subprocess.CalledProcessError as e: + return f"Git操作失败: {e.stderr}" + except Exception as e: + return f"生成patch时发生错误: {e}" + +def compress_patch(patch_content: str) -> str: + """分析补丁内容,提取MR提交信息""" + lines = patch_content.strip().split('\n') + if not lines: + return '' + + mr_lines = [] + in_mr_section = False + signature_pattern = r'^--\s*$' # 签名行通常以"-- "开头 + + # 找到第一个非空行作为开始 + start_line = 0 + for i, line in enumerate(lines): + if line.strip(): + start_line = i + break + + for i, line in enumerate(lines): + if i == start_line: + in_mr_section = True + mr_lines.append(line.strip()) + continue + + if in_mr_section: + if re.match(signature_pattern, line): + in_mr_section = False + break + if line.startswith('diff --git'): + in_mr_section = False + break + mr_lines.append(line.strip()) + + return '\n'.join(mr_lines) + +def get_diff(input_str): + marker = "Signed-off-by:" + marker_index = input_str.find(marker) + if marker_index == -1: + return "" + + line_end_index = input_str.find('\n', marker_index) + if line_end_index == -1: + return "" + + return input_str[line_end_index + 1:] + +def get_patch_count(app_name: str, commit_id: Optional[str] = None) -> int: + """ + 读取指定软件的补丁数量 + """ + directory = "/tmp/" + app_name + "-patchs" + subprocess.run(["rm", "-rf", directory], check=True) # 清理/tmp/app_name-patchs目录 + os.makedirs(directory, exist_ok=True) + generate_app_patches(app_name, commit_id, directory) + path = Path(directory) + return len([file for file in path.iterdir() if file.is_file()]) + +def find_patch_file(folder_path: str, number: int): + """ + 根据输入的编号查找对应命名的patch文件 + """ + if not os.path.exists(folder_path) or not os.path.isdir(folder_path): + raise ValueError(f"文件夹路径不存在或不是一个有效的目录: {folder_path}") + + target_prefix = f"{number:04d}-" + pattern = re.compile(rf"^{target_prefix}.*\.patch$") + + for filename in os.listdir(folder_path): + if pattern.match(filename): + return filename + + return None + +def concat_commit_url(app_name, commit_id): + """ + 将仓库URL和提交ID拼接成完整的提交链接 + """ + _, src_url_proxy, _, _, _, _, _, _, _ = get_config(app_name) + base_url = src_url_proxy.replace(f"/{app_name}.git", f"/{app_name}") + return f"{base_url}/commit/{commit_id}" + +def read_patch(app_name: str, patch_number: int) -> dict: + """ + 读取指定软件的补丁并按格式返回 + """ + try: + result = {} + directory = "/tmp/" + app_name + "-patchs" + + # 1. 查找补丁文件 + file_path = find_patch_file(directory, patch_number) + + # 2. **增加关键的返回值判断** + if file_path is None: + return {"读取补丁失败": f"在目录 '{directory}' 中未找到编号为 {patch_number} 的补丁文件。"} + + result["patch名"] = file_path + + # 3. 后续操作(只有在文件找到后才执行) + if not os.path.isabs(file_path): + full_path = os.path.join(directory, file_path) + else: + full_path = file_path + + with open(full_path, 'r', encoding='utf-8') as f: + data = f.read() + content = compress_patch(data) + lines = content.split('\n') + if lines: # 确保 lines 列表不为空 + first_line = lines[0].strip() + parts = first_line.split() + if len(parts) >= 2 and parts[0] == 'From': + result["提交信息"] = parts[1] + result["commit_url"] = concat_commit_url(app_name, parts[1]) + result["提交描述"] = content + result["差异"] = get_diff(data) + + except Exception as e: + # 保留通用异常捕获,以防其他未知错误 + return {"读取补丁失败": str(e)} + + return result + +def remove_braces(data, indent=0): + """递归处理JSON数据,去除大括号和中括号""" + result = [] + indent_str = " " * indent + + if isinstance(data, dict): + for key, value in data.items(): + value_str = remove_braces(value, indent + 1) + result.append(f"{key} {value_str}\n") + elif isinstance(data, list): + for i, item in enumerate(data): + item_str = remove_braces(item, indent + 1) + result.append(f"{item_str}") + else: + return str(data) + + return "\n".join(result) + +def get_patch_by_patch_name(patch_name, data): + for patch in data: + if patch.get("patch名") == patch_name: + return patch + return None + +def extract_patches(data): + filtered = [item for item in data if item.get("确认合入") == "是"] + modules = {} + for item in filtered: + module = item.get("模块") + patch_name = item.get("patch名") + if module not in modules: + modules[module] = [] + modules[module].append(patch_name) + + for module in modules: + modules[module].sort() + + sorted_modules = sorted(modules.items(), key=lambda x: min(x[1])) + result = [patches for module, patches in sorted_modules] + return result + +def create_local_new_branch_name(dst_branch_name: str, model: str) -> str: + """ + 根据给定的目标分支名称和模型名称创建新的本地分支名称 + """ + if 'release/' in dst_branch_name: + new_branch = dst_branch_name.replace('release/', 'personal/') + return f"{new_branch}/ai-patch_{model}" + elif 'feature/' in dst_branch_name: + new_branch = dst_branch_name.replace('feature/', 'personal/') + return f"{new_branch}/ai-patch_{model}" + elif 'personal/' in dst_branch_name: + return f"{dst_branch_name}_patch_{model}" + else: + return f"personal/{dst_branch_name}/ai-patch_{model}" + +def apply_patch(app_name: str, data: List[Dict]) -> str: + """ + 根据补丁分析内容将对应的patch合入到代码仓并提交至远端仓库 + """ + patch_name_list = extract_patches(data) + + results = "" + result_failed = "" + + repo_url, _, _, _, dst_branch_name, dst_url, project_url, project_token, mr_server = get_config(app_name) + + patchs_directory = "/tmp/" + app_name + "-patchs" + if not os.path.exists(patchs_directory): + os.makedirs(patchs_directory, exist_ok=True) + generate_app_patches(app_name, None, patchs_directory) + + local_dst_repo_path = "/tmp/" + app_name + "-dst" + try: + # 1. 克隆或更新目标代码仓库 + if not subprocess.run(["test", "-d", f"{local_dst_repo_path}/.git"], capture_output=True).returncode == 0: + subprocess.run(["git", "clone", dst_url, local_dst_repo_path], check=True) + else: + subprocess.run(["git", "-C", local_dst_repo_path, "fetch", "origin"], check=True) + + # 切换分支 + subprocess.run(["git", "-C", local_dst_repo_path, "checkout", dst_branch_name], check=True) + subprocess.run(["git", "-C", local_dst_repo_path, "pull", "origin", dst_branch_name], check=True) + + except subprocess.CalledProcessError as e: + return f"Git操作失败: {e.stderr}" + + original_cwd = os.getcwd() + try: + os.chdir(local_dst_repo_path) + + for patch_names in patch_name_list: + mr_message = {"补丁列表:": ["\r"], "改动分析:": ["\r"], "合入原因:": ["\r"]} + title = "" + title_flag = True + commit_flag = True + + for patch_name in patch_names: + patch_json = get_patch_by_patch_name(patch_name, data) + if title_flag: + title = patch_json.get("补丁类型") + "[" + patch_json.get("模块") +"] " + "智能补丁回合\n" + title_flag = False + + patch_path = os.path.join(patchs_directory, patch_name) + if not os.path.exists(patch_path): + result_failed += f"{patch_name} patch文件不存在\n" + break + + try: + # 使用git am应用patch + apply_result = subprocess.run(["git", "am", "--3way", patch_path], + capture_output=True, text=True, cwd=local_dst_repo_path) + + if apply_result.returncode == 0: + mr_message["补丁列表:"].append(patch_name[5:]) + mr_message["改动分析:"].append(f"{patch_name[5:]}
{patch_json.get('commit_url')}
{patch_json.get('改动说明')}
") + mr_message["合入原因:"].append(f"{patch_name[5:]}
{patch_json.get('确认理由')}
") + else: + subprocess.run(["git", "am", "--abort"], capture_output=True, text=True, cwd=local_dst_repo_path) + result_failed += f"{patch_json.get('提交信息')} patch冲突\n" + commit_flag = False + except Exception as e: + result_failed += f"{patch_json.get('提交信息')} patch冲突: {str(e)}\n" + commit_flag = False + break + + if commit_flag: + results += title + "合入成功\n" + new_branch_name = create_local_new_branch_name(dst_branch_name, patch_json.get("模块")) + + # 如果目标分支已存在,删除目标分支 + subprocess.run(["git", "branch", "-D", new_branch_name], capture_output=True, text=True, cwd=local_dst_repo_path) + # 创建本地模块粒度分支 + subprocess.run(["git", "branch", new_branch_name], check=True, capture_output=True, text=True, cwd=local_dst_repo_path) + # 切换进入本地模块粒度分支 + subprocess.run(["git", "checkout", "-f", new_branch_name], check=True, capture_output=True, text=True, cwd=local_dst_repo_path) + # 推送到远程 + subprocess.run(["git", "push", "--set-upstream", "origin", new_branch_name], check=True, capture_output=True, text=True, cwd=local_dst_repo_path) + + mr_url = create_mr(mr_server, project_url, project_token, new_branch_name, dst_branch_name, title, remove_braces(mr_message)) + results += "合并请求URL:" + mr_url + "\n" + + # 强制切换回本地分支 + subprocess.run(["git", "checkout", "-f", dst_branch_name], check=True, capture_output=True, text=True, cwd=local_dst_repo_path) + # 清理本地分支,准备下一次循环 + subprocess.run(["git", "reset", "--hard", "origin/" + dst_branch_name], check=True, capture_output=True, text=True, cwd=local_dst_repo_path) + else: + print(f"存在合并失败的补丁文件,该模块不能创建MR") + + except Exception as e: + return {"error": f"subprocess run时发生异常: {str(e)}"} + finally: + os.chdir(original_cwd) + + return results + result_failed + +def create_mr(mr_server: str, project_url: str, project_token: str, src_branch: str, tar_branch: str, title: str, description: str) -> str: + if mr_server == "sangfor": + return create_sangfor_mr(project_url, project_token, src_branch, tar_branch, title, description) + else: # default for gitee + return create_gitee_mr(project_url, project_token, src_branch, tar_branch, title, description) + +def create_sangfor_mr(project_url: str, project_token: str, src_branch: str, tar_branch: str, title: str, description: str) -> str: + """创建深信服远程模块分支的merge request到远程该工程分支 + """ + try: + mr_data = { + "source_branch": src_branch, + "target_branch": tar_branch, + "title": title, + "description": description, + "assignee_id": 27, #固定 + "remove_source_branch": True #固定 + } + headers = {'PRIVATE-TOKEN': project_token} + response = requests.post(project_url, json=mr_data, headers=headers, timeout=10) + response.raise_for_status() + return f"成功创建MR: " + response.json()["web_url"] + except requests.exceptions.RequestException as e: + return f"推送失败: {str(e)}" + except KeyError: + return f"推送失败: 响应中缺少web_url字段,响应内容: {response.text}" + +def create_gitee_mr(project_url: str, project_token: str, src_branch: str, tar_branch: str, title: str, description: str) -> str: + """创建gitee远程模块分支的merge request到远程该工程分支 + """ + try: + mr_data = { + "title": title, + "head": src_branch, + "base": tar_branch, + "body": description + } + headers = { + "Authorization": f"token {project_token}", + "Content-Type": "application/json" + } + response = requests.post(project_url, json=mr_data, headers=headers, timeout=10) + response.raise_for_status() + return f"成功创建MR: " + response.json()["html_url"] + except requests.exceptions.RequestException as e: + return f"推送失败: {str(e)}" + except KeyError: + return f"推送失败: 响应中缺少html_url字段,响应内容: {response.text}" +