[fix] 修正RedisEventBus没有创建消费组导致报错的问题

This commit is contained in:
智能大石头 2025-04-16 01:20:05 +08:00
parent d533bc9bea
commit 7c4877dc21
1 changed files with 8 additions and 2 deletions

View File

@ -42,6 +42,9 @@ public class RedisEventBus<TEvent>(FullRedis cache, String topic, String group)
_queue = cache.GetStream<TEvent>(topic);
_queue.Group = group;
_queue.FromLastOffset = true;
if (_source != null)
_ = Task.Run(() => ConsumeMessage(_source));
}
/// <summary>发布消息到消息队列</summary>
@ -67,7 +70,6 @@ public class RedisEventBus<TEvent>(FullRedis cache, String topic, String group)
if (Interlocked.CompareExchange(ref _source, source, null) == null)
{
Init();
_ = Task.Run(() => ConsumeMessage(_source));
}
}
@ -82,11 +84,14 @@ public class RedisEventBus<TEvent>(FullRedis cache, String topic, String group)
{
DefaultSpan.Current = null;
var cancellationToken = source.Token;
var stream = _queue!;
try
{
if (!stream.Group.IsNullOrEmpty()) stream.SetGroup(stream.Group);
while (!cancellationToken.IsCancellationRequested)
{
var msg = await _queue!.TakeMessageAsync(15, cancellationToken).ConfigureAwait(false);
var msg = await stream!.TakeMessageAsync(15, cancellationToken).ConfigureAwait(false);
if (msg != null)
{
var msg2 = msg.GetBody<TEvent>();
@ -111,6 +116,7 @@ public class RedisEventBus<TEvent>(FullRedis cache, String topic, String group)
finally
{
source.Cancel();
_queue = null;
}
}
}