# concurrent-data-process **Repository Path**: xu-jing-ming/concurrent-data-process ## Basic Information - **Project Name**: concurrent-data-process - **Description**: 多线程同时处理数据,不同线程之间数据存储使用堆外内存 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2023-07-11 - **Last Updated**: 2023-12-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java ## README # concurrent-data-process #### 介绍 在我开发的时候经常想使用Flink来处理一些数据,奈何公司或者项目没有Flink集群, 也尝试过在项目中使用Flink的本地模式,但是这样的代码肯定会被领导打回,于是我就想自己写一个类Flink的并发处理框架。 于是就实现了一版多线程的数据处理代码。 多线程同时处理数据,不同线程之间数据存储使用堆外内存,编码类似于Flink。 目前只实现了基本的能力,有兴趣的小伙伴可以一起把它慢慢完善. ### 说明 有问题、或者有特定的场景需求可以加作者微信 xs15335194440,或者提交您的代码,大家一起探讨,感谢您的意见。 #### 软件架构 借鉴flink架构逻辑实现线程级的类flink框架,代码逻辑图如下 ![输入图片说明](代码逻辑图.png "屏幕截图.png") #### 安装教程 代码直接复制到自己的项目中即可 #### 使用说明 1. 但并行度运行 ``` AsyncExecutor executor = AsyncExecutor.getInstance(); executor.fromElement("aa", "bb", "cc") .map((MapFunction) value -> String.join(",", value, "1")) .filter((FilterFunction) value -> value.startsWith("bb")) .flatMap( (FlatMapFunction) (value, out) -> out.collect(value)) .sink((SinkFunction) System.out::println); executor.execute(); ``` 2.多并行度运行 ``` AsyncExecutor executor = AsyncExecutor.getInstance(); executor.fromElement( new Person("test1", 24, "南京", Arrays.asList("1355346554433", "1355346554433")), new Person("test2", 25, "上海", Arrays.asList("13553465544", "1355346554ee4")), new Person("test3", 26, "北京", Arrays.asList("13553465544d33", "13553465544"))) .map((MapFunction) value -> { if (value.getAddr().equals("南京")) { value.setProvince("江苏省"); } return value; }).parallelism(2) .filter((FilterFunction) value -> value.getName().equals("test1")) .parallelism(2) .sink((SinkFunction) System.out::println); executor.execute(); ``` 每次运行都能够打印出每个阶段的数据输入和输出情况 ``` Source-423a8219-c41d-4bd6-9c45-d5ea0ee7280f task: input lines: 3, input data size: 300 Map-a994f9ff-eb8c-4cdd-aecc-586c8cba3181 task: input lines: 3, input data size: 300 Filter-2011c74a-8aed-4753-9106-1fe920ca1c72 task: input lines: 1, input data size: 99 Filter-2011c74a-8aed-4753-9106-1fe920ca1c72 task: input lines: 2, input data size: 208 Sink-6fe38668-1f95-488b-bf86-def1c8fbfac8 task: input lines: 1, input data size: 108 ```