# folkmq **Repository Path**: sagframe/folkmq ## Basic Information - **Project Name**: folkmq - **Description**: FolkMQ,一个新起的内存型消息队列。支持:发布、订阅、定时、ACK,重试、延时(大约 100_000 TPS) - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 64 - **Created**: 2023-11-25 - **Last Updated**: 2023-11-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README

FolkMQ

一个新起的内存型消息队列

Maven Apache 2 jdk-8 jdk-11 jdk-17 jdk-21
gitee star github star



## 简介 * 基于 [Socket.D 通讯应用协议](https://gitee.com/noear/socketd) 开发的内存型消息队列。俗称:民谣消息队列(FolkMQ) * 支持 发布、订阅、定时、ACK,重试、延时 * 没有 集群和持久化 * 大约 100_000 TPS(使用 MacBook pro 2020 款本机测试,单客户端发与收) ## 开发过程视频 * [[Socket.D 实战] 之录播手写 FolkMQ (1)](https://www.bilibili.com/video/BV1dj411j7PQ/) * 完成客户端功能实现 * [[Socket.D 实战] 之录播手写 FolkMQ (2)](https://www.bilibili.com/video/BV1EC4y177sb/) * 完成服务端功能实现 * 完成通讯测试 * [[Socket.D 实战] 之录播手写 FolkMQ (3)](https://www.bilibili.com/video/BV11v411c7kJ/) * 添加专用连接地址 * 添加异步订阅与发布 * 添加AK/SK鉴权 * [[Socket.D 实战] 之录播手写 FolkMQ (4)](https://www.bilibili.com/video/BV1oc41167DY/) * 添加订阅身份支持(支持:以实例订阅,以集群订阅) * [[Socket.D 实战] 之录播手写 FolkMQ (5)](https://www.bilibili.com/video/BV1zc41167Uj/) * 添加客户端消费的ACK机制支持 * [[Socket.D 实战] 之录播手写 FolkMQ (6)](https://www.bilibili.com/video/BV1pe411f7BX/) * 添加用户身份队列化 * 添加消息派发时序化 * [[Socket.D 实战] 之录播手写 FolkMQ (7)](https://www.bilibili.com/video/BV1iM411Z7cu/) * 添加服务端的ACK机制支持 * 添加消息重试机制 * [[Socket.D 实战] 之录播手写 FolkMQ (8)](https://www.bilibili.com/video/BV1j34y1w7x2/) * 完善ACK要制,确保客户端没有答复还能再发 * 取消会话查找 * [Socket.D 实战] 之录播手写 FolkMQ (9), 预告 * 优化代码, * 完成压测 100万 收发不丢消息 * 组织测试用例,构建单测 ## 应用示例 * maven ```xml org.noear folkmq 1.0.2 org.noear socketd-transport-java-tcp 2.0.20 ``` * server demo ```java public class ServerDemo { public static void main(String[] args) throws Exception { MqServer server = new MqServerImpl() .addAccess("folkmq", "YapLHTx19RlsEE16") .start(9393); } } ``` * client demo ```java public class ClientDemo1 { public static void main(String[] args) throws Exception { //客户端 MqClient client = new MqClientImpl("folkmq://127.0.0.1:9393?ak=folkmq&sk=YapLHTx19RlsEE16") .connect(); //订阅 client.subscribe("demo", "(ip or cluster-name)", message -> { System.out.println("ClientDemo1::" + message); }); //发布 client.publish("demo", "hi"); //发布,并指定5秒后派发 client.publish("demo", "hi", new Date(System.currentTimeMillis() + 5000)); for (int i = 0; i < 10; i++) { Thread.sleep(100); client.publish("demo", "hi"); } } } ``` ### 自动重试与延时策略 ```java public class MqNextTime { /** * 获取下次派发时间 * * @param messageHolder 消息 * */ public static long getNextTime(MqMessageHolder messageHolder) { switch (messageHolder.getDistributeCount()) { case 0: return 0; case 1: return System.currentTimeMillis() + 1000 * 5; //5s case 2: return System.currentTimeMillis() + 1000 * 30; //30s case 3: return System.currentTimeMillis() + 1000 * 60 * 3; //3m case 4: return System.currentTimeMillis() + 1000 * 60 * 9; //9m case 5: return System.currentTimeMillis() + 1000 * 60 * 15; //15m case 6: return System.currentTimeMillis() + 1000 * 60 * 30; //30m case 7: return System.currentTimeMillis() + 1000 * 60 * 60; //60m default: return System.currentTimeMillis() + 1000 * 60 * 60 * 2; //120m } } } ```