From d6537bca7f15a027b41222709881d2080a587426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Wed, 16 Apr 2025 08:09:42 +0800 Subject: [PATCH] =?UTF-8?q?RedisEventBus=E4=BA=8B=E4=BB=B6=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E4=BF=9D=E7=95=993=E5=A4=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs | 4 ++-- NewLife.Redis/Services/RedisCacheProvider.cs | 3 ++- NewLife.Redis/Services/RedisEventBus.cs | 9 ++++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs index 03f3160..f34b214 100644 --- a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs +++ b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs @@ -160,8 +160,8 @@ public class MultipleConsumerGroupsQueue : IDisposable //尝试创建消费组 try { - _Queue.Group = subscribeAppName; - _Queue.GroupCreate(subscribeAppName); + //_Queue.Group = subscribeAppName; + _Queue.SetGroup(subscribeAppName); } catch (Exception err) { diff --git a/NewLife.Redis/Services/RedisCacheProvider.cs b/NewLife.Redis/Services/RedisCacheProvider.cs index ed960aa..4c62575 100644 --- a/NewLife.Redis/Services/RedisCacheProvider.cs +++ b/NewLife.Redis/Services/RedisCacheProvider.cs @@ -118,7 +118,8 @@ public class RedisCacheProvider : CacheProvider else { var rs = _redisQueue.GetStream(topic); - rs.Group = group; + //rs.Group = group; + rs.SetGroup(group); queue = rs; XTrace.WriteLine("[{0}/{2}]队列消息数:{1}", topic, queue.Count, group); diff --git a/NewLife.Redis/Services/RedisEventBus.cs b/NewLife.Redis/Services/RedisEventBus.cs index 83283b3..85d2085 100644 --- a/NewLife.Redis/Services/RedisEventBus.cs +++ b/NewLife.Redis/Services/RedisEventBus.cs @@ -39,9 +39,12 @@ public class RedisEventBus(FullRedis cache, String topic, String group) if (_queue != null) return; // 创建Stream队列,指定消费组,从最后位置开始消费 - _queue = cache.GetStream(topic); - _queue.Group = group; - _queue.FromLastOffset = true; + var stream = cache.GetStream(topic); + stream.Group = group; + stream.FromLastOffset = true; + stream.Expire = TimeSpan.FromDays(3); + + _queue = stream; if (_source != null) _ = Task.Run(() => ConsumeMessage(_source));