# data-stream **Repository Path**: callisto_x/data-stream ## Basic Information - **Project Name**: data-stream - **Description**: 一个java开发的流式数据处理框架,需要与springboot集成使用。 每个stream为一个数据处理单元,各个stream 可自由串联组合,形成一条链式处理流,完成特定的功能。 框架特点: 1.功能模块化,高度可复用 2.灵活易扩展 3.不改代码情况下,通过配置即可实现不同功能 3.1提高系统吞吐量 3.2现有stream灵活 - **Primary Language**: Java - **License**: GPL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-12-02 - **Last Updated**: 2025-02-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java, SpringBoot, Stream ## README #data-stream 简介 data-stream是基于责任链设计模式开发一个流式数据处理框架,每个stream为一个数据处理单元,各个stream 可自由串联组合,形成一条链式处理流,完成特定的功能。 框架特点: 1.功能模块化,高度可复用 2.灵活易扩展 3.不改代码情况下,通过配置即可实现不同功能 3.1提高系统吞吐量 3.2现有stream灵活组合,实现新功能 3.3可实现功能自由拆分与合并 3.4其他待发掘玩法 4.轻松实现横向/纵向扩容 ##stream类型 stream分为3种类型 : ###输入流 INPUT :输入流,无上游,可以指定下游,最后初始化(执行init方法),最先销毁(执行destroy方法),一般为生产数据方(自己产生或从外部获取数据) 通过在类上标注@InputDataStream 注解表示或使用@DataStream(type = StreamType.INPUT) ###普通流 STREAM: 普通流,可以指定上下游,在INPUT之前初始化,销毁顺序在OUTPUT之后,从上游获取数据,处理之后发给下游 通过在类上标注@DataStream注解表示 ###输出流 OUTPUT:输出流,有上游,无下游,最先初始化(执行init方法),最后销毁(执行destroy方法),会将数据发到外部 通过在类上标注@OutputDataStream 注解表示或使用@DataStream(type = StreamType.OUTPUT) ###注:stream 设置为多例,以便生成多份实例,达到stream功能复用的目的 #配置使用 ##1.引入依赖 ```xml cn.data.stream data-stream xxx ``` ##2.在启动类添加@EnableDataStreamConfig来启用data-stream 如果自己实现了stream 则在@EnableDataStreamConfig注解中通过values指定需要扫描的stream所在的包路径,支持多个包 ##配置方式 ###注解方式 ```java @InputDataStream @Output(outputs = {"aStream"}) //@InputDataStream(outputs = {"aStream"}) 等效方式 public class AInput extends AbstractStream{} @DataStream(value = "aStream", outputs = {"aOutstream"}) public class MidStream extends AbstractStream{} @OutputDataStream(value="aOutstream") public class OutStreamTest extends AbstractStream{} ``` 通过以上代码建立了aInput---->aStream---->aOutstream 的数据处理流程 ###XML配置方式 通过xml配置方式可以实现更复杂的数据处理流程,配置文件示例如下: ```xml dingdingCall dingdingCall manualCall shunt ``` 通过以上配置,建立了 ``` pushMsg------->dingdingCall------>manualCall 的数据处理流程 pushMsg2---┘ └--->shunt ``` pushMsg和pushMsg2 都是testInputStream的实例 manualCall和shunt 都是lastStream的实例 ##注:目前xml方式和注解方式不会同时生效,两个都存在的情况下,只有xml方式会生效 #内置stream 内置stream是提供一些通用的功能组件,来实现特定功能,如数据分发、数据处理时耗时计、并发、同步等功能 内置组件都在cn.data.stream.stream.inner包下 ##数据分发stream 功能:根据配置的分发规则,将数据转发给符合条件的下游stream ```ini ${name}.nextStream1=condition1 ${name}.nextStream2=condition2 ``` ${name}为分发stream的名称,condition1为分发条件,分发条件为aviator 表达式,aviator 使用参考:https://blog.csdn.net/weixin_48029654/article/details/118870862 配置示例: ```ini dispatch.nextStream1=status==1 ||status ==2 dispatch.nextStream2=order.riskControlStatus=0 ``` 以上配置可实现: status=1或者status=2的数据转发到nextStream1 order.riskControlStatus=0的数据转发到nextStream2 当数据不满足任何分发条件,则不会往下分发 #data-stream流代码自定义开发 1.新建一个类 继承 AbstractStream抽象类或者实现IComponent接口,并根据需要标记合适的DataStream注解 2.如果有stream自身要用的配置文件需要读取,重写init()方法,通过 PropertyResolver config读取配置 3.在process(Object data)方法中实现主业务,如果需要将数据发往下游,则调用super.process(),可将消息发往下游; 如果是实现IComponent接口,则需要根据是否需要将消息发往下游,拿到outputs列表,遍历发送