diff --git a/NewLife.Redis/FullRedis.cs b/NewLife.Redis/FullRedis.cs index 5333424..72ba205 100644 --- a/NewLife.Redis/FullRedis.cs +++ b/NewLife.Redis/FullRedis.cs @@ -4,11 +4,12 @@ using System.Text; using NewLife.Caching.Clusters; using NewLife.Caching.Models; using NewLife.Caching.Queues; +using NewLife.Caching.Services; using NewLife.Collections; using NewLife.Data; using NewLife.Log; +using NewLife.Messaging; using NewLife.Model; -using NewLife.Serialization; namespace NewLife.Caching; @@ -645,6 +646,9 @@ public class FullRedis : Redis /// /// public virtual RedisSortedSet GetSortedSet(String key) => new(this, key); + + /// 获取事件总线 + public override IEventBus GetEventBus(String topic, String clientId = "") => new RedisEventBus(this, topic, clientId); #endregion #region 字符串操作 diff --git a/NewLife.Redis/NewLife.Redis.csproj b/NewLife.Redis/NewLife.Redis.csproj index 0bfcd68..e6b370e 100644 --- a/NewLife.Redis/NewLife.Redis.csproj +++ b/NewLife.Redis/NewLife.Redis.csproj @@ -56,7 +56,7 @@ - + diff --git a/NewLife.Redis/Services/RedisEventBus.cs b/NewLife.Redis/Services/RedisEventBus.cs new file mode 100644 index 0000000..cf6cef1 --- /dev/null +++ b/NewLife.Redis/Services/RedisEventBus.cs @@ -0,0 +1,115 @@ +using NewLife.Caching.Queues; +using NewLife.Log; +using NewLife.Messaging; +using System.Diagnostics.CodeAnalysis; + +namespace NewLife.Caching.Services; + +/// Redis事件上下文 +public class RedisEventContext(IEventBus eventBus, Queues.Message message) : IEventContext +{ + /// 事件总线 + public IEventBus EventBus { get; set; } = eventBus; + + /// 原始消息 + public Queues.Message Message { get; set; } = message; +} + +/// Redis事件总线 +/// +/// 实例化消息队列事件总线 +public class RedisEventBus(FullRedis cache, String topic, String group) : EventBus +{ + private RedisStream? _queue; + private CancellationTokenSource? _source; + + /// 销毁 + /// + protected override void Dispose(Boolean disposing) + { + base.Dispose(disposing); + + _source?.TryDispose(); + } + + /// 初始化 + [MemberNotNull(nameof(_queue))] + protected virtual void Init() + { + if (_queue != null) return; + + // 创建Stream队列,指定消费组,从最后位置开始消费 + _queue = cache.GetStream(topic); + _queue.Group = group; + _queue.FromLastOffset = true; + } + + /// 发布消息到消息队列 + /// 事件 + /// 上下文 + public override Task PublishAsync(TEvent @event, IEventContext? context = null) + { + Init(); + var rs = _queue.Add(@event); + + return Task.FromResult(1); + } + + /// 订阅消息。启动大循环,从消息队列订阅消息,再分发到本地订阅者 + /// 处理器 + /// 客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅 + public override Boolean Subscribe(IEventHandler handler, String clientId = "") + { + if (_source == null) + { + var source = new CancellationTokenSource(); + if (Interlocked.CompareExchange(ref _source, source, null) == null) + { + Init(); + _ = Task.Run(() => ConsumeMessage(_source)); + } + } + + // 本进程订阅。从队列中消费到消息时,会发布到本进程的事件总线,这里订阅可以让目标处理器直接收到消息 + return base.Subscribe(handler, clientId); + } + + /// 从队列中消费消息,经事件总线送给设备会话 + /// + /// + protected virtual async Task ConsumeMessage(CancellationTokenSource source) + { + DefaultSpan.Current = null; + var cancellationToken = source.Token; + try + { + while (!cancellationToken.IsCancellationRequested) + { + var msg = await _queue!.TakeMessageAsync(15, cancellationToken).ConfigureAwait(false); + if (msg != null) + { + var msg2 = msg.GetBody(); + if (msg2 != null) + { + // 发布到事件总线 + await base.PublishAsync(msg2, new RedisEventContext(this, msg)).ConfigureAwait(false); + } + } + else + { + await Task.Delay(1_000, cancellationToken).ConfigureAwait(false); + } + } + } + catch (TaskCanceledException) { } + catch (OperationCanceledException) { } + catch (Exception ex) + { + XTrace.WriteException(ex); + } + finally + { + source.Cancel(); + } + } +} diff --git a/Test/Test.csproj b/Test/Test.csproj index d0c0cef..6d299fb 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -16,7 +16,7 @@ - + diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj index 3b08b86..039c60b 100644 --- a/XUnitTest/XUnitTest.csproj +++ b/XUnitTest/XUnitTest.csproj @@ -10,10 +10,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive