# python-redis-stream-queue **Repository Path**: sealphp/python-redis-stream-queue ## Basic Information - **Project Name**: python-redis-stream-queue - **Description**: PythonRedisStreamQueue 是一个基于 Redis 5.0+ 使用Stream 数据结构实现的 Python 消息队列库,相比传统 Redis List 实现的消息队列具有更强的可靠性和功能完整性。 ✅ ​​消息持久化​​ ✅ ​​多消费者组 ✅ ​​消息确认机制​ - 提供ACK确认机制,确保消息至少被消费一次 ✅ ​​阻塞式消费​​并发不重复消费 ✅ ​​失败重试​​ - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-05-13 - **Last Updated**: 2025-05-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # python-redis-stream-queue ### 介绍 PythonRedisStreamQueue 是一个基于 Redis 5.0+ 使用Stream 数据结构实现的 Python 轻量级消息队列库,相比传统 Redis List 实现的消息队列具有更强的可靠性和功能完整性。 ### 优势 ✅ ​​消息持久化​​ - 消息默认持久化到内存,通过 RDB/AOF 实现磁盘持久化(类似 Kafka 的 commit log) ✅ ​​​​消息回溯​​ - 支持通过消息 ID(时间戳-序列号)回溯历史消息(类似 Kafka 的 offset) ✅ ​​多消费者组 - 支持多消费者协同消费,类似 Kafka 的 Consumer Group,组内消费者负载均衡【竞争消费模式】 ✅ ​​消息确认机制​ - 提供ACK确认机制,确保消息至少被消费一次,未确认的消息会保留在 pending list ✅ ​​阻塞式消费​​ - 支持非轮询的阻塞等待新消息(XREADGROUP BLOCK),并发多线程不会重复消费同一个消息 ✅ ​​失败重试​​ ### 适用场景 电商订单处理系统 实时日志收集与分析 分布式任务调度 微服务间异步通信 秒杀系统请求缓冲 配合三方框架(如 FastAPI),但又不想使用重量级的celery框架或kafka消息队列等场景(支持多线程并发提高处理效率) ### 用法 ##### 发送任务消息 ``` from python_redis_stream_queue import PythonRedisStreamQueue,queue import datetime import time queue = PythonRedisStreamQueue() queue_name = 'redis_stream_queue:test' queue.add_job(queue_name, {'id': 1, 'data': 'task data'}) ``` ##### 监听消息队列worker(建议python程序启动时就调用worker()方法启动监听) ``` from python_redis_stream_queue import PythonRedisStreamQueue import datetime import time queue = PythonRedisStreamQueue() queue_name = 'redis_stream_queue:test' # 启动worker主进程 def worker(): try: # 启动消费者主进程前清理上次pending队列未确认的任务(注意:消费者执行任务时结束消费者进程,可能会丢失部分未确认的任务) # queue.remove_pending_messages(queue_name, 'group_1') # worker消费者主进程启动(监听到新消息后调用回调函数执行处理任务)(execute可自行替换实际执行的函数名) queue.worker(queue_name, callback=execute, group_name='group_1', thread_count=1) except Exception as e: import traceback traceback.print_exc() print('[%s] worker异常: %s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),str(e))) # 任务执行入口函数 def execute(task: dict): try: result = False # todo something... # 执行任务结果回调函数处理后续(如果无需回调,请抛出异常) callback(task, result) except Exception as e: import traceback traceback.print_exc() print('[%s] [%s] 执行任务异常: %s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), execute_name, str(e))) # 异常处理开始 # todo something... (需要重试的异常无需进行回调) # 异常处理结束 # 抛出异常将进行重试(默认重试3次,可自行调整重试次数) raise Exception(str(e)) # 任务结果回调函数 def callback(task: dict, result: dict): try: task_data = task.get('data', {}) task_id = task.get('id') or 0 if task_id: if result is True: print('[INFO] 任务完成 id: %s' % str(task_id)) pass elif result is False: print('[ERROR] 任务失败 id: %s' % str(task_id)) pass return True except Exception as e: import traceback traceback.print_exc() print('[%s] [%s] 任务状态异常: %s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), execute_name, str(e))) pass ```