diff --git a/release-assistant/javcra/application/modifypart/modifyentrance.py b/release-assistant/javcra/application/modifypart/modifyentrance.py index 155b6c09e56d9f1c8189abf47cae85e1edf6e746..34041f3494383e0be3ee08032f28c51b49b351c1 100644 --- a/release-assistant/javcra/application/modifypart/modifyentrance.py +++ b/release-assistant/javcra/application/modifypart/modifyentrance.py @@ -14,10 +14,12 @@ Description: modify entrance """ import re + +from javcra.api.gitee_api import Issue from javcra.libs.log import logger -class Operation: +class Operation(Issue): """ md operation for release issue description """ @@ -235,3 +237,304 @@ class Operation: final_lines = self.modify_block_lines(issue_body_lines, block_lines, block_start_idx, block_end_idx) return "".join(final_lines) + + def add_for_specific_block(self, body_str, issues, table_head, block_name): + """ + add info in specific block + + Args: + body_str: str, issue body + issues: issues to be add + table_head: list, table head + block_name: block name + + Returns: + processed issue body str + """ + + if not body_str: + raise ValueError("no content of release issue body, failed to add.") + + issues_dict = dict() + issues_info_list = list() + + # If the block is "requires", then get the md format str directly, like "|bluez|接口变更|" + if "requires" in block_name: + requires_md_str = self.convert_md_table_format(table_head, issues) + if requires_md_str: + issues_info_list.append(requires_md_str) + issues_dict = {"requires_str": requires_md_str} + else: + # for other blocks, get detail issue info according to each issue id, then get the md format str + # like "|#I41R53:CVE-2021-36222|krb5|已完成|7.5|1.18.2|否|" + for issue_id in issues: + single_issue_info = self.get_single_issue_info(issue_id, block_name) + if single_issue_info: + issues_info_list.append(single_issue_info) + issue_info = self.convert_md_table_format(table_head, single_issue_info) + issues_dict.setdefault(issue_id, issue_info) + + # if all the info to be add are empty + if not issues_info_list: + raise ValueError("failed to add, please check whether the issues to be added exists.") + + return self.get_new_body_lines( + body_str, append_info=issues_dict, start_flag=block_name, end_flag="\n" + ) + + def delete_for_specific_block(self, body_str, issues, block_name): + """ + delete info in specific block + + Args: + body_str: str, issue body + issues: issues to be delete + block_name:block name + + Returns: + processed issue body str + """ + if not body_str: + raise ValueError("no content of release issue body, failed to delete.") + + res_str = body_str + # delete each issue and then get new issue body lines + for issue_id in issues: + res_str = self.get_new_body_lines( + res_str, delete_issue=issue_id, start_flag=block_name, end_flag="\n" + ) + return res_str + + @staticmethod + def __get_score(body_str): + """ + get the score of cve + + Args: + body_str: cve issue body str + + Returns: + str: score value or no score + """ + # to match openEuler评分 for cve + euler_score_pattern = re.compile("openEuler评分.*?(?P[0-9\.]+)", flags=re.S) + euler_res = euler_score_pattern.search(body_str) + + if euler_res: + return euler_res["euler_score"] + else: + # to match BaseScore for cve + base_score_pattern = re.compile("BaseScore[::](?P[0-9\.]+)") + base_score = base_score_pattern.search(body_str) + return base_score["base_score"] if base_score else "no score info" + + def __is_abi_change(self, body_str): + """ + Parsing whether the abi has changed + + Args: + body_str: cve issue body + + Returns: + "是" or "否" + """ + # to match whether the abi has changed of specific branch + abi_content_pattern = re.compile("修复是否涉及abi变化.*?(?P.*)[\\n$]", flags=re.S) + abi_res = abi_content_pattern.search(body_str) + + if not abi_res: + logger.error("The abi pattern did not match the info") + return "否" + + abi_info = abi_res["abi"] + branch = self.get_update_issue_branch() + if not branch: + return "否" + + for line in abi_info.splitlines(): + if branch in line and "是" in line: + return "是" + return "否" + + def get_single_issue_info(self, issue_id, block_name): + """ + get singe issue info for specific block + + Args: + block_name: name of block + issue_id: issue id + + Returns: + list: issue info list + """ + issue_content = self.get_issue_info(issue_number=issue_id) + if not issue_content: + logger.error("can not get the content of issue {}, perhaps this issue does not exist.".format(issue_id)) + return [] + + repository = issue_content.get("repository", {}) + # for all the block, get the dict of repository and status for the issue + issue_info = { + "仓库": repository.get("name", "无仓库信息"), + "status": issue_content.get("issue_state", "无状态信息") + } + + block_names_list = ["## 2、bugfix", "# 3、安装、自编译问题", "# 4、遗留问题"] + if block_name in block_names_list: + issue_info["issue"] = "#" + issue_id + + if "遗留" in block_name: + issue_info["type"] = issue_content.get("issue_type", "无type信息") + issue_info["status"] = "遗留" + + elif "CVE" in block_name: + issue_body = self.get_issue_body(issue_id) + if not issue_body: + logger.error("empty issue body for {}, can not get the info for {} block.".format(issue_id, block_name)) + return [] + + version_pattern = re.compile("漏洞归属的版本[::](?P.*)") + version = version_pattern.search(issue_body) + issue_info["CVE"] = "#" + issue_id + issue_info["score"] = self.__get_score(issue_body) + issue_info["version"] = version["version"] if version else "no version info" + issue_info["abi是否变化"] = self.__is_abi_change(issue_body) + + return [issue_info] + + def operate_for_specific_block(self, table_head, block_name, table_body=None, prefix="", operate="init", + body_str=None, issues=None): + """ + Process init, add, delete operations for specific block + + Args: + table_head: list, table head + block_name: str, block name like ""## 1、CVE"" + table_body: table_body of specific part for init, like [{..},{..},..]. + prefix: prefix of block, like "修复了bugfix xxx个" + operate: init, add, delete + body_str: issue body, str + issues: issue id, list + + Raises: + ValueError: not allowed operate + + Returns: + processed release issue body str + """ + if not table_body: + table_body = [] + + if operate == "init": + return self.init_md_table(table_head, table_body, block_name, prefix) + elif operate == "add": + return self.add_for_specific_block(body_str, issues, table_head, block_name) + elif operate == "delete": + return self.delete_for_specific_block(body_str, issues, block_name) + else: + raise ValueError( + "not allowed 'operate' value,expected in ['init','add','delete'],but given {}".format(operate) + ) + + def init(self): + """ + init specific block + + Returns: + init str + """ + return self.get_new_issue_body(operate="init") + + def get_new_issue_body(self, operate="init", body_str=None, issues=None): + raise NotImplementedError + + +class BugFixIssue(Operation): + def __init__(self, repo, token, issue_num): + super().__init__(repo, token, issue_num) + + def get_new_issue_body(self, operate="init", body_str=None, issues=None): + """ + get new issue body for bugfix block operation + + Args: + operate: operate str. Defaults to "init".expected [init,add,delete] + body_str: gitee issue body str. + issues: issue id list. + + Returns: + str: new issue body str + """ + if not issues: + issues = [] + + table_head = ["issue", "仓库", "status"] + block_name = "## 2、bugfix" + bugfix_list = [] + bugfix_prefix = "修复bugfix {}个".format(len(bugfix_list)) + + return self.operate_for_specific_block( + table_head, + block_name, + prefix=bugfix_prefix, + operate=operate, + table_body=bugfix_list, + body_str=body_str, + issues=issues, + ) + + +class InstallBuildIssue(Operation): + def __init__(self, repo, token, issue_num): + super().__init__(repo, token, issue_num) + + def get_new_issue_body(self, operate="init", body_str=None, issues=None): + """ + get new issue body for install build block operation + + Args: + operate: operate str. expected [init,add,delete] + body_str: gitee issue body str. + issues: issue id list. + + Returns: + new issue body str + """ + table_head = ["issue", "仓库", "status"] + block_name = "# 3、安装、自编译问题" + + return self.operate_for_specific_block( + table_head, + block_name, + operate=operate, + body_str=body_str, + issues=issues + ) + + +class RemainIssue(Operation): + def __init__(self, repo, token, issue_num): + super().__init__(repo, token, issue_num) + + def get_new_issue_body(self, operate="init", body_str=None, issues=None): + """ + get new issue body for remain block operation + + Args: + operate: operate str. expected [init,add,delete] + body_str: gitee issue body str. + issues: issue id list. + + Returns: + str: new issue body str + """ + t_header = ["issue", "仓库", "status", "type"] + block_name = "# 4、遗留问题" + + return self.operate_for_specific_block( + t_header, + block_name, + operate=operate, + body_str=body_str, + issues=issues + )