# message_bus **Repository Path**: liudegui/message_bus ## Basic Information - **Project Name**: message_bus - **Description**: 一个消息总线模块:当收到等待的消息时,调用回调函数;当在规定时间内没收到时,调用超时响应函数。 - **Primary Language**: C++ - **License**: MIT - **Default Branch**: optimze - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 48 - **Forks**: 23 - **Created**: 2015-06-11 - **Last Updated**: 2025-07-28 ## Categories & Tags **Categories**: utils **Tags**: None ## README ## 1. 设计思路 在设计该消息总线系统时,我们的核心目标是通过简单的接口实现复杂的功能,同时确保系统的性能和可靠性。为了实现这一目标,我们将把系统分成多个模块,分别处理消息的发布与订阅、超时检查和任务调度。 ### 1.1 关键类设计 - **MessageBus**:负责管理消息的发布和订阅,支持消息回调处理及超时管理。 - **PeriodicTaskScheduler**:负责定期检查超时情况和执行周期性任务。 - **SubscriptionItem**:表示订阅项,包含回调函数、超时设置等信息。 ### 1.2 类图 在此处,我们使用类图来展示系统中各个类及其之间的关系。这样可以帮助我们清晰地理解各个模块如何协作。 ```mermaid classDiagram class MessageBus { +publishMessage(messageId: int, messageContent: string, additionalData: int) +checkAndHandleTimeouts() +subscribeToMessage(item: SubscriptionItem) +clearAllSubscriptions() +stop() +start() } class PeriodicTaskScheduler { +startTask(intervalMs: int, task: function) +stop() +isStopped(): bool } class SubscriptionItem { +messageCallback: function +timeoutCallback: function +timeoutIntervalMilliseconds: int +timeoutTimestampMicroseconds: long +subscribedMessageIds: list +subscriptionType: SubscriptionType } class SubscriptionType { <> ALWAYS_SUBSCRIBE ONCE_SUBSCRIBE } MessageBus "1" --> "1..*" PeriodicTaskScheduler : manages MessageBus "1" --> "1..*" SubscriptionItem : manages MessageBus "1" --> "1..*" SubscriptionType : uses ``` ### 1.3 时序图 时序图展示了在 `publishMessage` 函数执行时,消息如何流动并与订阅者和任务调度器进行交互。 ```mermaid sequenceDiagram participant User participant MessageBus participant SubscriptionItem participant TaskScheduler User ->> MessageBus: publishMessage(messageId, messageContent) MessageBus ->> MessageBus: Lock callbackMapMutex MessageBus ->> MessageBus: Lock timeoutCallbackListMutex MessageBus ->> SubscriptionItem: Call messageCallback MessageBus ->> MessageBus: Handle timeout if exists MessageBus ->> TaskScheduler: checkAndHandleTimeouts() TaskScheduler ->> MessageBus: checkTimeouts() MessageBus ->> SubscriptionItem: Call timeoutCallback if timeout occurs MessageBus ->> User: Done ``` ### 1.4 流程图 流程图描述了系统的运行流程,包括消息发布、超时检查等关键步骤。 ```mermaid flowchart TD A[Start] --> B[MessageBus Created] B --> C[Start Task Scheduler] C --> D[Publish Message] D --> E{Message ID Exists?} E -- Yes --> F[Call Message Callback] E -- No --> G[Do Nothing] F --> H{Once Subscribe?} H -- Yes --> I[Unsubscribe After One Time] H -- No --> J[Keep Subscribed] G --> K[Timeout Check] K --> L{Timeout Reached?} L -- Yes --> M[Call Timeout Callback] L -- No --> N[End] M --> N J --> N I --> N K --> N subgraph Timeout Check Loop direction TB K --> L L --> M end ``` ## 2. 代码结构与设计 ### 2.1 消息回调与订阅项 为了灵活处理消息和超时回调,我们使用了 `std::function` 来定义回调函数,并通过结构体 `SubscriptionItem` 存储相关信息。 ```cpp using MessageCallback = std::function& messageContent, std::int32_t additionalData)>; using TimeoutCallback = std::function; struct SubscriptionItem { MessageCallback messageCallback = nullptr; // 消息回调函数 TimeoutCallback timeoutCallback = nullptr; // 超时回调函数 std::int32_t timeoutIntervalMilliseconds = 1000; // 超时时间间隔,单位:毫秒 std::int64_t timeoutTimestampMicroseconds = 0; // 超时戳,单位:微秒 std::vector subscribedMessageIds; // 订阅的消息ID SubscriptionType subscriptionType = SubscriptionType::ALWAYS_SUBSCRIBE; // 订阅类型 }; ``` 每个 `SubscriptionItem` 包含一个消息回调函数(`messageCallback`)、一个超时回调函数(`timeoutCallback`)、订阅的消息 ID 以及超时管理信息(如超时间隔和超时时间戳)。 ### 2.2 消息总线类 `MessageBus` `MessageBus` 类是整个消息总线的核心,提供了消息发布、订阅管理以及超时检查等功能。 ```cpp class MessageBus { public: static MessageBus& instance() { static MessageBus instance; return instance; } void publishMessage(std::int32_t messageId, const std::vector& messageContent, std::int32_t additionalData = 0); void checkAndHandleTimeouts(); bool subscribeToMessage(const SubscriptionItem& item); void clearAllSubscriptions(); void stop(); void start(); }; ``` `MessageBus` 提供了以下方法: - `publishMessage`:发布消息给所有订阅该消息 ID 的订阅者。 - `checkAndHandleTimeouts`:检查所有订阅项是否超时并执行相应的回调。 - `subscribeToMessage`:订阅指定消息 ID 的消息并注册相关的回调函数。 - `clearAllSubscriptions`:清空所有订阅项。 - `start` 和 `stop`:启动和停止定时任务调度器。 ### 2.3 定时任务调度器 `PeriodicTaskScheduler` 为了能够定期执行任务(如超时检查),我们实现了一个内嵌的定时任务调度器类。 ```cpp class PeriodicTaskScheduler { public: PeriodicTaskScheduler() : stopped_(true), tryToStop_(false) {} void startTask(std::int32_t intervalMs, const std::function& task); void stop(); bool isStopped() const; private: std::atomic stopped_; std::atomic tryToStop_; std::mutex mutex_; std::condition_variable stopCond_; }; ``` `PeriodicTaskScheduler` 使用 `std::atomic` 控制任务的启动与停止,通过 `startTask` 方法以指定的时间间隔启动一个新线程执行任务。 ## 3. 核心功能实现 ### 3.1 消息发布 ```cpp void MessageBus::publishMessage(std::int32_t messageId, const std::vector& messageContent, std::int32_t additionalData) { std::unique_lock callbackMapLock(callbackMapMutex_, std::defer_lock); std::unique_lock timeoutCallbackListLock(timeoutCallbackListMutex_, std::defer_lock); std::lock(callbackMapLock, timeoutCallbackListLock); // 清除已超时的订阅项 auto it = timeoutCallbackList_.begin(); while (it != timeoutCallbackList_.end()) { if (std::find((*it)->subscribedMessageIds.begin(), (*it)->subscribedMessageIds.end(), messageId) != (*it)->subscribedMessageIds.end()) { it = timeoutCallbackList_.erase(it); } else { ++it; } } // 调用回调函数 auto callbackIt = callbackMap_.find(messageId); if (callbackIt != callbackMap_.end()) { for (auto& item : callbackIt->second) { if (item->messageCallback) { item->messageCallback(messageContent, additionalData); } if (item->subscriptionType == SubscriptionType::ONCE_SUBSCRIBE) { unsubscribe(messageId, item); } } } } ``` ### 3.2 超时检查 ```cpp void MessageBus::checkAndHandleTimeouts() { std::unique_lock timeoutCallbackListlck(timeoutCallbackListMutex_); std::int64_t currentTime = getTimeStamp(); for (auto it = timeoutCallbackList_.begin(); it != timeoutCallbackList_.end();) { if ((*it)->timeoutTimestampMicroseconds <= currentTime) { if ((*it)->timeoutCallback) { (*it)->timeoutCallback(); } for (std::int32_t msgId : (*it)->subscribedMessageIds) { unsubscribe(msgId, *it); } it = timeoutCallbackList_.erase(it); } else { ++it; } } } ``` 以上代码定期检查所有订阅项是否超时,并触发超时回调。