# streamLearn_2 **Repository Path**: xinyue0331/stream-learn_2 ## Basic Information - **Project Name**: streamLearn_2 - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 29 - **Created**: 2024-12-16 - **Last Updated**: 2024-12-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # StreamLearn drawing 本仓库专注于流数据学习算法,旨在提供一种高效的处理流式数据、应对数据分布变化并提升学习性能的解决方案。仓库中的算法能够实时处理持续到来的数据流,适应数据的动态变化,并通过优化调度算法提高资源利用效率和任务完成的时效性。该仓库整合了五个子课题中的多个流式数据处理算法,主要内容如下: ## 算法介绍 - **GDRO & WGDRO**:课题一的GDRO算法及其加权版本,实现同质噪声下吞吐量相同和不同情况下的学习。 - **MERO & WMERO**:课题一的MERO算法及其加权版本,实现异质噪声下吞吐量相同和不同情况下的学习。 - **无监督分类**:课题二中的矩阵略图近似优化算法,针对带噪声的CIFAR-10图像分类任务。 - **SAFC**:课题三中的增量学习算法,分为准备阶段(P阶段)和适应阶段(A阶段)。 - **流数据调度**:课题四中的流数据学习算法调度模拟环境,优化流数据任务的资源利用率和时效性。 - **类别增量学习**:课题五中的增量学习算法,支持动态添加类别。 ## 文件结构 - **Algorithm**:每个算法单独放在文件夹或Python文件中(如GDRO、WGDRO、MERO等)。 - **Dataset**:每个数据集单独放在文件夹或Python文件中(如CIFAR-10)。 - **Tests**:提供测试入口,展示如何调用算法。 ## 使用方法 每个算法都包含一个测试入口,作为算法调用的示例。 --- ## StreamAlgorithm Stream learn算法主要需要实现以下接口,具体实现参考StreamLearn/Base/DeepModelMixin.py 注意以下几点 1. 每个算法单独为一个文件夹或者python文件,放在Algorithm文件夹下面(实现StreamAlgorithm)。 2. 每个数据集单独作为一个文件夹或者python文件,放在Dataset文件夹(实现StreamDataset)。 3. 每个算法需要提供一个测试入口,作为算法调用的样例。放在tests文件夹。 4. 提供对应的代码说明,可以参考下面的“流数据分布鲁棒学习算法” 5. 数据集中的数据如果不太大(1MB以下)可以将数据放在Dataset/data文件夹下,新建一个项目对应的子目录。如果数据集较大,应当在国内的数据平台(例如百度网盘)上传并在README中提供对应的链接和使用方法。 ```python class StreamAlgorithm(ABC): @abstractmethod def stream_fit(self, data_batch): # 数据流式到来,算法对一个数据batch进行学习 pass @abstractmethod def stream_evaluate(self, data_batch): # 数据流式到来,算法对一个数据batch进行评测 pass def train_and_evaluate_stream_algorithm(): alg = Algorithm() data = StreamDataset() for data_batch in iter(dataset): # 在流式场景下,首先对无标记样本进行预测,然后获得样本的标记并进行训练 # 例如在推荐系统中,首先为用户推荐item,然后根据用户是否点击给样本打上标记 metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print(metrics) ``` --- ## 课题一:流数据分布鲁棒学习算法 ### 1.1 分布鲁棒学习概况 针对多源异质流数据分布场景,提出了对分布变化鲁棒的算法。具体而言,针对多数据分布吞吐量相同和不同的两种情况,分别设计了 GDRO 算法和加权 GDRO (WGDRO) 算法;进一步地,我们针对异质噪声下的分布鲁棒学习提出了两种超额损失算法,分别为 MERO 算法和加权 MERO (WMERO) 算法。最后,建立了理论最优的样本复杂度、并通过实验验证了这四种方法的有效性。 数据集:CIFAR-10 源数据处理参考Github([Link][https://github.com/kuangliu/pytorch-cifar/tree/master]),预处理后数据 ([Link][https://pan.baidu.com/s/1kkgJsbhnLkShwlFncTIh3w?pwd=rne7],提取码:rne7),文件下载后解压到 `stream-learn\StreamLearn\Dataset\cifar10_1` 目录下。 数据预处理方法:根据每幅图片的HSV直方图,对图片进行聚类,类别数取为10。 ### 1.2 分布鲁棒学习算法 本算法包含`多分布数据集构造`,`GDRO 算法和 WGDRO 算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/CIFAR10_Dataset.py - StreamLearn/Algorithm/GDRO/GDRO.py - StreamLearn/Algorithm/GDRO/WGDRO.py - StreamLearn/tests/test_GDRO.py 我们可以分别调用 GDRO 算法 和 WGDRO 算法的测试函数,在 CIFAR10_Dataset 上进行数据性能测试: ```python # GDRO算法 和 WGDRO算法 的测试代码 if args.dataset_mode == 'balance': train_and_evaluate_stream_GDRO(args) elif args.dataset_mode == 'imbalance': train_and_evaluate_stream_WGDRO(args) ``` 首先,我们按照以下方式构造流式数据集CIFAR10_Dataset,其中参数 `args.dataset_mode='balance' `表明流数据吞吐量相同场景;`args.dataset_mode='imbalance'` 表明吞吐量不同的场景。 ```python dataset = CIFAR10_Dataset(args) ``` 其次,我们对算法进行初始化,算法使用 ResNet18 神经网络: ```python alg = GDRO(args) / alg = WGDRO(args) ``` 最后,分别对两个模型进行性能测试,完整代码如下: ```python # GDRO算法测试 def train_and_evaluate_stream_GDRO(args): dataset = CIFAR10_Dataset(args) alg = GDRO(args) metrics = [] for i in range(args.run_time): data_batch = dataset.sample_m() metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print('GDRO:', metrics) ``` ```python # WGDRO算法测试 def train_and_evaluate_stream_WGDRO(args): metrics = [] dataset = CIFAR10_Dataset(args) args.get_n = dataset.get_n() alg = WGDRO(args) for i in range(min(alg.get_t(),args.run_time)): data_batch1 = dataset.sample_pr_avail() data_batch2 = dataset.sample_pr_avail() data_batch = [data_batch1, data_batch2] metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print('WGDRO:', metrics) ``` ### 1.3 分布鲁棒超额损失优化算法 本算法包含`多分布数据集构造`,`MERO 算法和 WMERO 算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/CIFAR10_Dataset.py - StreamLearn/Algorithm/MERO/MERO.py - StreamLearn/Algorithm/MERO/WMERO.py - StreamLearn/tests/test_MERO.py 我们可以分别调用 MERO 算法和 WMERO 算法的测试函数,在 CIFAR10_Dataset 上进行数据性能测试: ```python # MERO 算法和 WMERO 算法的测试代码 if args.dataset_mode == 'balance': train_and_evaluate_stream_MERO(args) elif args.dataset_mode == 'imbalance': train_and_evaluate_stream_WMERO(args) ``` 我们按照与1.2节相同的做法构造流式数据集CIFAR10_Dataset,其中参数 `args.dataset_mode='balance' `表明流数据吞吐量相同场景;`args.dataset_mode='imbalance'` 表明吞吐量不同的场景。 ```python dataset = CIFAR10_Dataset(args) ``` 接着,我们对算法进行初始化,算法使用 ResNet18 神经网络: ```python alg = MERO(args) / alg = WMERO(args) ``` 最后,分别对两个模型进行性能测试,完整代码如下: ```python # MERO 算法测试 def train_and_evaluate_stream_MERO(args): alg = MERO(args) metrics = [] dataset = CIFAR10_Dataset(args) for i in range(args.run_time): data_batch = dataset.sample_m() metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print('MERO:', metrics) ``` ```python # WMERO 算法测试 def train_and_evaluate_stream_WMERO(args): metrics = [] dataset = CIFAR10_Dataset(args) args.get_n = dataset.get_n() alg = WMERO(args) for i in range(min(alg.get_t(),args.run_time)): data_batch1 = dataset.sample_pr_avail() data_batch2 = dataset.sample_pr_avail() data_batch = [data_batch1, data_batch2] metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print('WMERO:', metrics) ``` ## 课题二 ### 2.1 高吞吐率图流的动态图神经网络 该算法主要包含`数据集`,`动态图神经网络实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/DTDGsDataset.py - StreamLearn/Algorithm/DGNN.py - StreamLearn/tests/test_DecoupledDGNN.py 算子应用实例见 StreamLearn/tests/test_DecoupledDGNN.py 文件,使用方法为: ```bash python StreamLearn/tests/test_DecoupledDGNN.py --data NAME_OF_DATA --checkpoint PATH_TO_CHECKPOINT ``` 首先,获取算法所需数据集:Dataset文件夹中包含了数据下载和预处理的代码,用户可以根据需要修改其中的数据存储路径。 ```python from StreamLearn.Dataset.DTDGsDataset import DTDGsDataset dataset = DTDGsDataset(args.data) ``` 其次,按照以下方式获取动态图节点的时序表示: ```python from StreamLearn.Algorithm.DecoupledDGNN.graph_embs import GraphEmbs gen_embs = GraphEmbs(dataset.path, args.data, args.rmax, args.alpha) gen_embs.load_and_process_data ``` 最后,调用DGNN算法进行训练并测试: ```python from StreamLearn.Algorithm.DecoupledDGNN.DGNN import DGNN execute = DGNN(args) execute.train() ``` ### 2.2 滑动窗口上的最优矩阵略图算子 该算子主要包含`数据集`,`DSFD算子实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/FDDataset.py - StreamLearn/Algorithm/SlidingWindowFD/DSFD.py - StreamLearn/tests/test_SWFD.py 算子应用实例见 StreamLearn/tests/test_SWFD.py 文件,使用方法为: ```bash PYTHONPATH=. python StreamLearn/tests/test_SWFD.py ``` 该实例分别实现了PAMAP向量流滑动窗口上的矩阵低秩近似任务。 PAMAP数据集下载地址:[百度网盘](https://pan.baidu.com/s/1TMSQ5Plm1E1b_jqCXkjW1A?pwd=esja),提取码: esja。 首先需要初始化长度为$N$的滑动窗口下的矩阵略图对象(向量维度为$d$,向量范数上界为$R$,空间开销为$O(ld/\beta)$,误差为$\beta/l$): ```python swfd = SeqDSFD(N, R, d, l, beta=1.0) ``` 对于向量流中每次到来的向量`data`,更新矩阵略图 ```python swfd.fit(data) ``` 查询当前滑动窗口组成的矩阵在某单位向量上的投影的模长,其中`direction`为该单位向量 ```python predict = swfd.predict(direction) ``` ### 2.3 矩阵略图优化的在线强化学习(Sketched Linear Bandit)算子 该算子主要包含`数据集`,`SLB算子实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/CIFAR10_Dataset.py - StreamLearn/Algorithm/SlidingWindowFD/LinearBandit.py - StreamLearn/Algorithm/SlidingWindowFD/SketchedBandit.py - StreamLearn/tests/test_SWFD.py 该实例实现了应用在CIFAR 10数据集上的带噪声标签的分类任务,CIFAR 10 数据集构造使用方法与 [1.2 分布鲁棒学习算法](#1.2) 相同。 算子应用实例见 StreamLearn/tests/test_SWFD.py 文件,使用方法为: ```bash PYTHONPATH=. python StreamLearn/tests/test_SWFD.py ``` 首先,我们按照以下方式构造流式数据集CIFAR10_Dataset,其中参数 `args.dataset_mode='balance' `表明流数据吞吐量相同场景;`args.dataset_mode='imbalance'` 表明吞吐量不同的场景。 ```python dataset = CIFAR10_Dataset(args) ``` 其次,我们对算法进行初始化,算法使用 Sketched Linear Bandit 强化学习算子: ```python alg = SketchedBandit(args) ``` 最后,分别对两个模型进行性能测试,完整代码如下: ```python metrics = [] for i in trange(args.run_time): data_batch = dataset.sample_m() metrics.append(alg.stream_evaluate(data_batch)) alg.stream_fit(data_batch) print('SketchedBandit CIFAR10 dataset: ', metrics) ``` ### 2.4 基于采样的分布式环境下的元素估计算法 实现了基于采样的、分布式流数据环境下、亚线性通信量的元素估计(NDV)算法。详见[https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment](https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment)。 ## 课题三 ### 3.1 流数据分布自适应学习算法 该算法主要包含`分布偏移数据集构造`,`ODS算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/TTADataset.py - StreamLearn/Algorithm/TTA/ODS.py - StreamLearn/tests/test_ODS.py 首先,测试代码的配置文件为:StreamLearn/Config/ODS.py,用户可修改测试文件进行不同复合数据分布变化的测试。 数据集下载地址:[CIFAR10-C](https://zenodo.org/records/2535967),训练参数下载地址:[百度网盘](https://pan.baidu.com/s/1mxADnKpv73X-Tu1uR8fkXg),提取码: qaj2。 其次,按照以下方式构造流式包含复合数据分布变化(包含协变量分布和标记分布偏移)的 CIFAR10 数据集。 ```python import StreamLearn.Dataset.TTADataset as datasets dataset = datasets.CIFAR10CB( root=args.stream.dataset_dir, batch_size=args.stream.batch_size, severities=args.stream.severities, corruptions=args.stream.corruptions, bind_class=args.stream.bind_class, bind_ratio=args.stream.bind_ratio, seed=args.seed, ) ``` 其中,`root`为数据根目录、`batch_size`控制数据流批大小、`severities`与`corruptions`控制协变量分布偏移的程度与类型、`bind_class`与`bind_ratio`控制标记分布变化的类别与比例、`seed`为数据集生成随机种子。 继而,调用 ODS 算法,复用已训练完毕的模型在复合分布偏移的数据流中进行自适应学习。 ```python args.method.model = net estimator = ODS.ODS(args.method) ``` 其中,`net`保存了深度学习模型,`args.method`中保存了 ODS 算法所需的超参数。 最后,将数据流中的样本输入算法中进行预测。 ```python # ODS算法测试 pred = estimator.predict(X).detach().cpu() ``` 测试具体代码详见 StreamLearn/tests/test_ODS.py 文件,使用方法为: ```bash cd stream-learn python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CHECKPOINT ``` ### 3.2 SAFC:同时增强特征和类的增量学习 该算法主要包括增量数据集构造,SAFC两种变体SAFC_D和SAFC_ID算法实现,性能测试三部分,相关代码参见目录: - StreamLearn/Algorithm/Algorithm_SAFC/Read_to_Python.py - StreamLearn/Algorithm/Algorithm_SAFC/SAFC.py - StreamLearn/Algorithm/Algorithm_SAFC/predict.py - StreamLearn/Algorithm/Algorithm_SAFC/SAFC_Stream_Funcs.py 首先,获取算法所需数据集: 地址: 通过百度网盘分享的文件:SAFC_datasets_CIFAR10.zip 链接:https://pan.baidu.com/s/1xtZjSxIIEMnUwoM7VXCzkQ 提取码:nudt 说明: 本实验需要用到的数据集是CIFAR10,本例将data_batch_1.mat至data_batch_4.mat中的数据作为第一阶段数据;data_batch_2.mat中的数据作为第二阶段的数据;test_batch.mat中的数据是测试数据 然后,根据代码配置文件StreamLearn/Algorithm/Algorithm_SAFC/request_import.py安装库完成环境配置 其次,调用SAFC算法进行训练: SAFC训练基于第一阶段复用的SVM模型,因此先训练SVM,并保存训练好的SVM模型: ```python print('####eval####') print("begin svm1 training!") print('####eval####') svm1 = SVC(probability=True,kernel="linear",decision_function_shape='ovo') svm1.fit(data_s1.tolist(), label_s1_vec.tolist()) dump(svm1, save_dir+'/svm1.model') print('####eval####') print("end svm1 training!") print('####eval####') # 复用SVM,训练SAFC_D,并保存训练后的模型: print('####eval####') print("begin SAFC_D training!") print('####eval####') w_ours1 = SAFC_D(np.mat(svm1._get_coef()), np.transpose(data_s2), label_s2, alpha_best1, beta_best1, eta) dump(w_ours1, save_dir+'/SAFC_D.model') # 复用SVM,训练SAFC_ID,并保存训练后的模型: print('####eval####') print("begin SAFC_ID training!") print('####eval####') w_ours2 = SAFC_ID(np.mat(svm1._get_coef()),np.transpose(data_s2),label_s2,alpha_best2,beta_best2,eta) dump(w_ours2, save_dir+'/SAFC_ID.model') # 其中,alpha_best1, beta_best1, alpha_best2, beta_best2, eta是超参数 ``` 最后,分别对两个模型进行性能测试: ```python # SAFC_D: print('####eval####') print("begin SAFC_D testing!") print('####eval####') pred1, acc_ours1, auc_ours1, f1wei_ours1, f1macro_ours1, f1micro_ours1 = Predict(w_ours1,np.transpose(test_data),test_label) print('####eval####') print("begin SAFC_D evaluation!") print('####eval####') Acc_ours1.append(acc_ours1) AUC_ours1.append(auc_ours1) F1_weight_ours1.append(f1wei_ours1) F1_macro_ours1.append(f1macro_ours1) F1_micro_ours1.append(f1micro_ours1) # SAFC_ID: print('####eval####') print("begin SAFC_ID testing!") print('####eval####') pred2,acc_ours2,auc_ours2,f1wei_ours2,f1macro_ours2,f1micro_ours2 = Predict(w_ours2,np.transpose(test_data),test_label) print('####eval####') print("begin SAFC_ID evaluation!") print('####eval####') Acc_ours2.append(acc_ours2) AUC_ours2.append(auc_ours2) F1_weight_ours2.append(f1wei_ours2) F1_macro_ours2.append(f1macro_ours2) F1_micro_ours2.append(f1micro_ours2) ``` 在test文件中,按照不同的数据流场景,提供两个main文件 - StreamLearn/tests/tests_SAFC/CIFAR10_SAFC_DandSAFC_ID_allFunc_main.py 对应一次性读取两阶段数据并完成测试 - StreamLearn/tests/tests_SAFC/CIFAR10_SAFC_DandSAFC_ID_useFunc_main.py 对应分批次读取两阶段数据,存储模型,读取测试数据,调用模型并完成测试 ModelsIN用于存放训练获得的分类器 ## 课题四 ### 4.1 分布式交互学习流数据存储管理系统 GEAR GEAR是一个以GPU为中心、针对现代高性能服务器GPU硬件特性和高性能网络链接设计优化的分布式流数据存储管理系统,以支持基于大规模流数据的交互式学习。 该系统主要包含 `GEAR系统实现`和`最简样例运行代码`两部分 ,相关代码参见目录 * StreamLearn/Algorithm/GEAR * StreamLearn/tests/GEAR **样例运行** 首先,参考`StreamLearn/Algorithm/GEAR/README.md`文档中搭建基本的PyTorch-GPU运行环境,然后运行在目录下运行`pip install`命令以编译安装GEAR系统到Python环境: ```shell cd StreamLearn/Algorithm/GEAR/ pip install -r requirements. pip install . ``` 其次,参考运行`StreamLearn/tests/GEAR/offline/single-node/create.py`以下载并转换`hopper`数据集,该最小数据集会被存放于`/tmp/gear/checkpoints/example_shared_dataset.pt`的默认路径下(可通过`--data_path`参数指定存放路径)。 ```shell cd StreamLearn/tests/GEAR/offline/single-node/ python create.py --data_path /tmp/gear/checkpoints/example_shared_dataset.pt ``` 之后,运行`StreamLearn/tests/GEAR/offline/single-node/run.sh`脚本以快速运行样例程序。 **和现有分布式训练工作流集成** 如果希望在DeepSpeed分布式训练工作流集成调用GEAR,请参考`StreamLearn/tests/GEAR/offline/single-node/main.py`中的`setup`方法进行环境初始化和数据层加载: ```python def setup(): ... # 初始化DeepSpeed分布式环境 deepspeed.init_distributed(dist_init_required=True) # GEAR数据层运行时加载 gear.init() # GEAR数据层定义 # 如此处GEAR从离线数据集中记载 offline_loader_params = { "data_path": args.data_path, "mpu": None, "batch_size": 32, "sampling_method": "Uniform", "patterns": [ { "name": "observations", "pad": "tail", # Literal["head", "tail"] "offset": -1000, "length": 1000, # fetch entire sequence for a trajectory no longer than 1000 steps }, { "name": "actions", "pad": "tail", # Literal["head", "tail"] "offset": -1000, "length": 1000, }, ], } mpu = ModelParallelismUnit(device=torch.device("cuda")) mpu.build_ds_mpu() offline_loader_params["mpu"] = mpu offline_loader_params["attach"] = int(os.environ["LOCAL_RANK"]) != 0 loader = gear.loader.OfflineLoader.create(**offline_loader_params) ``` 其后在模型训练/推理过程中,GEAR数据层的交互方式参考`StreamLearn/tests/GEAR/offline/single-node/models/mlp/funcs.py`文件中`train_step`方法: ```python def train_step( loader, model, optimizer, step_id: int, tensorboard_writer: Union[SummaryWriter, None], ) ... # GEAR对外暴露使用逻辑和普通pytorch dataloader高度相似 timesteps, data_batch = next(loader) # 从data_batch中提取数据域 obs = data_batch[0] act = data_batch[1] # 消耗/使用数据 ... ``` ### 4.2 知识融合流数据生成式学习模型 AbdGen AbdGen基于反绎学习框架,利用逻辑推理结合生成式神经网络模型,支持知识融合条件下的流数据分布拟合与生成规则学习。 *代码路径*: Algorithm/AbdGen *运行环境*: 神经生成式模型与逻辑推理模块分别基于PyTorch及Swi-Prolog框架。 *执行训练*: 运行根目录下“main.py"文件。 需根据"main.py"下的parser arguments设计正确的文件路径。 模型训练参数在"exp_config/rule_learning_config.py"下进行设置。 ### 4.3 支撑轻量交互的长序列理解与知识融合的大模型框架 ReLLa **安装依赖** ~~~python pip install -r requirments.txt ~~~ **数据处理** 你可以通过[这里](https://drive.google.com/drive/folders/1av6mZpk0ThmkOKy5Y_dUnsLRdRK8oBjQ?usp=sharing)使用处理好的数据,其中包括原始数据以及检索增强后的数据,。数据集构成:全量测试集,采样后的训练集。Ml-1m/Ml-25m/BookCrossing三个数据集对应的历史序列长度分别为30/30/60。 或者你可以通过提供的处理脚本自行处理。处理脚本可以参考[data_preprocess](./data_preprocess/)中[BookCrossing](http://www2.informatik.uni-freiburg.de/~cziegler/BX/), [MovieLens-1M](https://grouplens.org/datasets/movielens/1m/), [MovieLens-25M](https://grouplens.org/datasets/movielens/25m/)对应的jupyter notebook。 **语义表征生成** 首先,利用LLM编码物品信息作为物品的语义表征,后续将被用于进行语义相似度计算以及相似物品检索。 ```python python get_semantic_embed.py --model_path XXX --data_set BookCrossing/ml-1m/ml-25m --pooling average ``` **近邻检索** 接着,利用上一步得到的语义表征计算物品相似度进行检索,并预存相似物品id。 - BookCrossing ```python python topK_relevant_BookCrossing.py ``` - MovieLens-1M ```python python topK_relevant_ml1m.py ``` - MovieLens-25M ~~~python python topK_relevant_ml25m.py ~~~ **文本数据构造** 然后,将处理好的id型推荐数据根据定义的提示模板转换成文本型数据。 ~~~python python data2json.py --K 5 --temp_type simple --set test --dataset ml-1m ~~~ 生成的部分文本样例可以参考[./data/ml-1m/proc_data/data/test/test_5_simple.json](./data/ml-1m/proc_data/data/test/test_5_simple.json)。 **混合训练集构建** 作为数据增强,我们提出了构建同时包含原始数据和检索增强的数据的混合训练集。 ```python python training_set_construction.py --K 5 ``` **样例运行** 你需要在脚本中指定模型权重路径,以及LoRA adapter的存储路径。 **推理** ~~~python python scripts/script_inference.py --K 5 --dataset ml-1m --temp_type simple ~~~ **微调** ~~~python python scripts/script_finetune.py --dataset ml-1m --K 5 --train_size 64 --train_type simple --test_type simple --epochs 5 --lr 1e-3 --total_batch_size 64 ~~~ ## 课题五:流数据增量学习算法 该算法主要包含`增量学习数据集构造`,`MEMO算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/CILDataset.py - StreamLearn/Algorithm/ClassIncrementalLearning/MEMO.py - StreamLearn/tests/test_stream_cil.py 首先,按照以下方式构造流式数据集CIFAR100。 ```python dataset = CIFAR100_CIL(root='/home/anony/DATASETS', download=True, nb_tasks=nb_tasks) ``` 其次,加载StreamCILAlgorithm,调用流式训练、预测、评估的接口,其内核为MEMO算法 ```python args = parser.parse_args() alg = StreamCILAlgorithm(args.alg) ``` 最后,遍历nb_tasks,在流数据上依次进行预测、评估、训练 ```python predictions = alg.stream_predict(X=dataset._test_data) accuracy = alg.stream_evaluate(dataset, predictions) alg.stream_fit(dataset) ```