diff --git a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs index 03f3160..f34b214 100644 --- a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs +++ b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs @@ -160,8 +160,8 @@ public class MultipleConsumerGroupsQueue : IDisposable //尝试创建消费组 try { - _Queue.Group = subscribeAppName; - _Queue.GroupCreate(subscribeAppName); + //_Queue.Group = subscribeAppName; + _Queue.SetGroup(subscribeAppName); } catch (Exception err) { diff --git a/NewLife.Redis/Services/RedisCacheProvider.cs b/NewLife.Redis/Services/RedisCacheProvider.cs index ed960aa..4c62575 100644 --- a/NewLife.Redis/Services/RedisCacheProvider.cs +++ b/NewLife.Redis/Services/RedisCacheProvider.cs @@ -118,7 +118,8 @@ public class RedisCacheProvider : CacheProvider else { var rs = _redisQueue.GetStream(topic); - rs.Group = group; + //rs.Group = group; + rs.SetGroup(group); queue = rs; XTrace.WriteLine("[{0}/{2}]队列消息数:{1}", topic, queue.Count, group); diff --git a/NewLife.Redis/Services/RedisEventBus.cs b/NewLife.Redis/Services/RedisEventBus.cs index 83283b3..85d2085 100644 --- a/NewLife.Redis/Services/RedisEventBus.cs +++ b/NewLife.Redis/Services/RedisEventBus.cs @@ -39,9 +39,12 @@ public class RedisEventBus(FullRedis cache, String topic, String group) if (_queue != null) return; // 创建Stream队列,指定消费组,从最后位置开始消费 - _queue = cache.GetStream(topic); - _queue.Group = group; - _queue.FromLastOffset = true; + var stream = cache.GetStream(topic); + stream.Group = group; + stream.FromLastOffset = true; + stream.Expire = TimeSpan.FromDays(3); + + _queue = stream; if (_source != null) _ = Task.Run(() => ConsumeMessage(_source));