# ignite-spring-project **Repository Path**: harrik/ignite-spring-project ## Basic Information - **Project Name**: ignite-spring-project - **Description**: 基于ignite的分布式框架,开箱即用,具有RPC,分布式广播,分布式消息,分布式计算,分布式缓存等功能 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 6 - **Created**: 2022-08-31 - **Last Updated**: 2022-08-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ignite-spring-boot-starter ========================== ## 项目初衷 这个框架是专门为中小型项目打造的微服务框架,底层基于apache ignite,特点是可以做到不依赖外部中间件,实现 `RPC服务`,`分布式缓存`,`分布式计算`,`分布式消息`等功能特性 框架也基于ignite的集群管理,实现了基于集群组的颗粒度的服务调用,即针对集群组的调用 * JDK和Spring boot版本

jdk版本为`1.8`

Spring boot 版本要求`1.5.3`以上

## 框架说明 服务中启动的 Spring boot 应用同时启动了ignite的server和client模式,注入到了Spring容器中 ``` @Autowired @Qualifier("igniteClient") private Ignite igniteClient; @Autowired @Qualifier("igniteServer") private Ignite igniteServer; ``` 因此你可以无缝地使用框架没有封装的ignite功能,更多的ignite的功能,请参考中文官网 (https://www.ignite-service.cn/doc/java/) ## 目录 * [RPC服务](#RPC) * [分布式消息](#Message) * [分布式广播](#BroadCast) * [分布式计算](#Computer) ## quick-start ### 构建 ``` cd ignite-spring-boot-starter mvn clean install ``` ### 构建一个基于ignite的 spring boot 项目 * 添加依赖: ```xml com.github.kong.spring.boot ignite-spring-boot-starter 1.0 ``` * 在application-yml添加ignite的相关配置信息,样例配置如下: * zookeeper 发现 ```yml #zookeeper发现 ignite-cluster: name: hello_client_1 #节点名称 role: client des: 测试服务 zookeeperUrl: 192.168.56.100:2181 localAddress: 127.0.0.1 localPort: 47600 ``` * 动态ip发现 ```yml ignite-cluster: name: hello_server #节点名称 role: server des: 测试服务端 multicast-group: 224.0.1.111 #组播地址 localAddress: 127.0.0.1 localPort: 48600 ``` * 为了开发方便,如果Spring boot Appliction 类的不在包名`com.github.kong`目录下,接下来在Spring Boot Application的上添加`@ComponentScan("com.github.kong.*")`,这样idea可以通过看到一些Bean是否已经注入了,当然也可以不添加,框架也有写扫描注入 ```java @SpringBootApplication @ComponentScan("com.github.kong.*") public class HelloWorldServerApplication { public static void main(String[] args) { SpringApplication.run(HelloWorldServerApplication.class); } } ``` ## RPC服务的创建与消费 ### 发布服务基于ignite的RPC服务 * 编写你的ignite服务,需要添加要发布的服务实现上添加`@IgniteRpcService`注解,继承`IgniteService`. * `HelloWorld` 是定义的接口 ```java @Service @IgniteRpcService(des = "这是一个例子") public class HelloWorldService extends IgniteService implements HelloWorld { } ``` * `@IgniteRpcService` 注解的定义 ```java /** * 服务提供者注解 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface IgniteRpcService { /** * @return */ String version() default "1.0"; //接口描述 String des() default ""; //单个节点部署的实例数 int maxPerNodeCount() default 1; //整个集群部署的最大实例数,0:无限制 int total() default 0; } ``` * 启动你的Spring Boot应用,观察控制台,可以看到ignite启动相关信息. ### 调用已经发布的RPC服务 * Spring boot 应用配置同上,唯一不同的是,需要更改配置 ```yml #zookeeper发现 ignite-cluster: name: hello_client_1 #节点名称 (必须在集群中唯一) ``` * 通过`@IgniteRpcReference`注入需要使用的interface. ```java @Controller public class HelloWorldController { @IgniteRpcReference private HelloWorld helloWorldService; @RequestMapping("/helloworld") @ResponseBody public String test(){ return helloWorldService.sayHello("kong"); } } ``` * 调用不同版本的RPC服务 ```java @IgniteRpcReference(version = "1.1") private HelloWorld helloWorldService; ``` * `@IgniteRpcReference` 注解的定义 ```java /** * 网格服务注入注解 */ @Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface IgniteRpcReference { String version() default "1.0"; //默认使用负载均衡 boolean isLoadbalance() default true; //默认不设超时 long timeout() default 0; } ``` ## 分布式消息 分布式消息是基于内存的消息订阅系统,如果需要持久化请使用外部的消息系统 ### 定义话题消费者 ```java @Service @IgniteMessageListener(topic = "hello",isBroadcast = false) public class HelloWorldMessage implements IgniteMessageRecevicer { @Override public boolean apply(UUID uuid, MessageModel messageModel) { System.out.println(messageModel); return true; } } ``` * `@IgniteMessageListener` 注解的定义 ```java /** * 服务提供者注解 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface IgniteMessageListener { //消息主题 String topic(); //消息描述 String des() default ""; //是否针对集群内的所有节点(是否允许重复消费) boolean isBroadcast() default true; } ``` * `MessageModel` 是一个消息封装,发送消息时必须用它来发送 ### 发送话题消息 ```java @Controller @RequestMapping("/message") public class MessageController { @Autowired private IgniteMessageSender sender; @RequestMapping("/sayHello") @ResponseBody public String test(){ sender.toRemote("hello", new MessageModel<>("1212")); return "1212"; } } ``` * `IgniteMessageSender`是框架注入的Bean,可以直接引用 ## 分布式广播 分布式广播是指:对集群组的所有节点发送消息,然后获取所有节点返回的结果,原来是基于ignite的分布式闭包利用反射机制调用spring容器内Bean的方法 ### 发送一个分布式广播 ```java @Controller @RequestMapping("/broadcast") public class BroadcastController { @Autowired private BroadcastServiceExecutor broadcastServiceExecutor; @RequestMapping("/sayHello") @ResponseBody public List test(){ return (List) broadcastServiceExecutor.broadcast("server", TestBroadService.class,"sayHello","12123"); } } ``` * `BroadcastServiceExecutor`是框架注入的Bean,可以直接引用 * `broadcast` 方法提供3个方法定义 ```java /** * 向其他集群广播 * @param targetRole 集群标识 * @param targetClass api类 * @param methodName 方法名称 * @param args 参数 * @return */ public List broadcast(String targetRole,Class targetClass,String methodName,Object... args){...} /** * 向远端集群广播消息 * @param targetClass * @param methodName * @param args * @return */ public List broadcastRemote(Class targetClass,String methodName,Object... args){...} /** * 向集群内广播消息 * @param targetClass * @param methodName * @param args * @return */ public List broadcastLocal(Class targetClass,String methodName,Object... args){...} ``` ## 分布式计算 分布式计算允许用户执行基于内存的Map-Reduce任务 * 创建 `Map-Reduce` 任务 ,需继承`ComputeTaskSplitAdapter`(import org.apache.ignite.compute.ComputeTaskSplitAdapter),泛型 * T:入参,R:返回类型 ```java //字数统计测试 @Service public class MapExampleCharacterCountTask extends ComputeTaskSplitAdapter,Integer> { @Nullable @Override public Integer reduce(List results) throws IgniteException { return results.stream().mapToInt(ComputeJobResult::getData).sum(); } @Override protected Collection split(int gridSize, List arg) throws IgniteException { LinkedList jobs = new LinkedList(); List> list = CollectionUtils.split(arg,10000); for(final List words : list){ jobs.add(new ComputeJobAdapter() { @Override public Object execute() throws IgniteException { int i = 0; for(String s : words){ i = i + s.length(); } return i; } }); } return jobs; } } ``` * 执行 `Map-Reduce` 任务 ```java @Controller @RequestMapping("/mr") public class MRTestController { @Autowired private MapReduceTaskExecutor,Integer> mapReduceTaskExecutor; @RequestMapping("/test") @ResponseBody public Object test(){ try { List records = new ArrayList<>(); // 创建CSV读对象 CsvReader csvReader = new CsvReader(new FileInputStream("D:\\data\\cs2.csv"), Charset.forName("GBK")); while (csvReader.readRecord()){ // 读一整行 records.add(csvReader.getRawRecord()); } List bigRecords = new ArrayList<>(); for(int i = 0; i < 5; i++){ bigRecords.addAll(records); } return mapReduceTaskExecutor.execute(MapExampleCharacterCountTask.class,bigRecords); } catch (IOException e) { e.printStackTrace(); } return ""; } } ``` * `MapReduceTaskExecutor`是框架注入的Bean,可以直接引用 ## 集群管理api 框架注入了`IgniteManager`这个bean,可以实现以下功能 ```java public interface IgniteManager { /** * 获取节点列表 * * @return */ List list(); /** * 获取节点的详细信息 * * @param nodeId * @return */ ClusterMetrics info(String nodeId); /** * 获取微服务的基本信息 * * @return */ List servieInfos(); /** * 集群消息信息 * @return */ List messagInfos(); } ``` * 使用`@Autowired` 注入即可 ```java @Controller @RequestMapping("/admin") public class AdminCotroller { @Autowired private IgniteManager igniteManager; @RequestMapping("/nodes") @ResponseBody public List nodes(){ return igniteManager.list(); } @RequestMapping("/nodeInfo/{id}") @ResponseBody public ClusterMetrics info(@PathVariable("id") String id){ return igniteManager.info(id); } @RequestMapping("/services") @ResponseBody public List services(){ return igniteManager.servieInfos(); } } ```