From 2e786f12507c2678b73d8cb380495d4b240dba7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Mon, 24 Feb 2025 19:34:09 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20=E6=96=B0=E5=A2=9ERedis=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E6=80=BB=E7=BA=BF=EF=BC=8C=E7=94=A8=E4=BA=8E=E5=88=86?= =?UTF-8?q?=E5=B8=83=E5=BC=8F=E5=9C=BA=E6=99=AF=E5=90=91=E5=A4=9A=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E9=83=A8=E7=BD=B2=E5=BA=94=E7=94=A8=E9=9B=86=E4=B8=AD?= =?UTF-8?q?=E5=88=86=E5=8F=91=E6=B6=88=E6=81=AF=E3=80=82=E4=BE=8B=E5=A6=82?= =?UTF-8?q?=E6=98=9F=E5=B0=98=E5=92=8CIoT=E5=B9=B3=E5=8F=B0=E7=9A=84?= =?UTF-8?q?=E6=8C=87=E4=BB=A4=E4=B8=8B=E5=8F=91=EF=BC=8C=E5=85=88=E6=8A=8A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=8F=91=E5=B8=83=E5=88=B0RedisStream?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E7=94=B1=E6=89=80=E6=9C=89=E5=BA=94=E7=94=A8?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B=E6=B6=88=E8=B4=B9=EF=BC=8C=E5=86=8D=E4=BB=8E?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E4=BC=9A=E8=AF=9D=E7=AE=A1=E7=90=86=E4=B8=AD?= =?UTF-8?q?=E6=89=BE=E5=88=B0=E9=95=BF=E8=BF=9E=E6=8E=A5=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=8B=E5=8F=91=E3=80=82=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9A=84=E5=89=8D=E9=9D=A2=EF=BC=8C=E4=B8=80?= =?UTF-8?q?=E8=88=AC=E5=A2=9E=E5=8A=A0=E8=AE=BE=E5=A4=87=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E5=B9=B6=E4=BB=A5#=E9=9A=94=E5=BC=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NewLife.Redis/FullRedis.cs | 6 +- NewLife.Redis/NewLife.Redis.csproj | 2 +- NewLife.Redis/Services/RedisEventBus.cs | 115 ++++++++++++++++++++++++ Test/Test.csproj | 2 +- XUnitTest/XUnitTest.csproj | 4 +- 5 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 NewLife.Redis/Services/RedisEventBus.cs diff --git a/NewLife.Redis/FullRedis.cs b/NewLife.Redis/FullRedis.cs index 5333424..72ba205 100644 --- a/NewLife.Redis/FullRedis.cs +++ b/NewLife.Redis/FullRedis.cs @@ -4,11 +4,12 @@ using System.Text; using NewLife.Caching.Clusters; using NewLife.Caching.Models; using NewLife.Caching.Queues; +using NewLife.Caching.Services; using NewLife.Collections; using NewLife.Data; using NewLife.Log; +using NewLife.Messaging; using NewLife.Model; -using NewLife.Serialization; namespace NewLife.Caching; @@ -645,6 +646,9 @@ public class FullRedis : Redis /// /// public virtual RedisSortedSet GetSortedSet(String key) => new(this, key); + + /// 获取事件总线 + public override IEventBus GetEventBus(String topic, String clientId = "") => new RedisEventBus(this, topic, clientId); #endregion #region 字符串操作 diff --git a/NewLife.Redis/NewLife.Redis.csproj b/NewLife.Redis/NewLife.Redis.csproj index 0bfcd68..e6b370e 100644 --- a/NewLife.Redis/NewLife.Redis.csproj +++ b/NewLife.Redis/NewLife.Redis.csproj @@ -56,7 +56,7 @@ - + diff --git a/NewLife.Redis/Services/RedisEventBus.cs b/NewLife.Redis/Services/RedisEventBus.cs new file mode 100644 index 0000000..cf6cef1 --- /dev/null +++ b/NewLife.Redis/Services/RedisEventBus.cs @@ -0,0 +1,115 @@ +using NewLife.Caching.Queues; +using NewLife.Log; +using NewLife.Messaging; +using System.Diagnostics.CodeAnalysis; + +namespace NewLife.Caching.Services; + +/// Redis事件上下文 +public class RedisEventContext(IEventBus eventBus, Queues.Message message) : IEventContext +{ + /// 事件总线 + public IEventBus EventBus { get; set; } = eventBus; + + /// 原始消息 + public Queues.Message Message { get; set; } = message; +} + +/// Redis事件总线 +/// +/// 实例化消息队列事件总线 +public class RedisEventBus(FullRedis cache, String topic, String group) : EventBus +{ + private RedisStream? _queue; + private CancellationTokenSource? _source; + + /// 销毁 + /// + protected override void Dispose(Boolean disposing) + { + base.Dispose(disposing); + + _source?.TryDispose(); + } + + /// 初始化 + [MemberNotNull(nameof(_queue))] + protected virtual void Init() + { + if (_queue != null) return; + + // 创建Stream队列,指定消费组,从最后位置开始消费 + _queue = cache.GetStream(topic); + _queue.Group = group; + _queue.FromLastOffset = true; + } + + /// 发布消息到消息队列 + /// 事件 + /// 上下文 + public override Task PublishAsync(TEvent @event, IEventContext? context = null) + { + Init(); + var rs = _queue.Add(@event); + + return Task.FromResult(1); + } + + /// 订阅消息。启动大循环,从消息队列订阅消息,再分发到本地订阅者 + /// 处理器 + /// 客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅 + public override Boolean Subscribe(IEventHandler handler, String clientId = "") + { + if (_source == null) + { + var source = new CancellationTokenSource(); + if (Interlocked.CompareExchange(ref _source, source, null) == null) + { + Init(); + _ = Task.Run(() => ConsumeMessage(_source)); + } + } + + // 本进程订阅。从队列中消费到消息时,会发布到本进程的事件总线,这里订阅可以让目标处理器直接收到消息 + return base.Subscribe(handler, clientId); + } + + /// 从队列中消费消息,经事件总线送给设备会话 + /// + /// + protected virtual async Task ConsumeMessage(CancellationTokenSource source) + { + DefaultSpan.Current = null; + var cancellationToken = source.Token; + try + { + while (!cancellationToken.IsCancellationRequested) + { + var msg = await _queue!.TakeMessageAsync(15, cancellationToken).ConfigureAwait(false); + if (msg != null) + { + var msg2 = msg.GetBody(); + if (msg2 != null) + { + // 发布到事件总线 + await base.PublishAsync(msg2, new RedisEventContext(this, msg)).ConfigureAwait(false); + } + } + else + { + await Task.Delay(1_000, cancellationToken).ConfigureAwait(false); + } + } + } + catch (TaskCanceledException) { } + catch (OperationCanceledException) { } + catch (Exception ex) + { + XTrace.WriteException(ex); + } + finally + { + source.Cancel(); + } + } +} diff --git a/Test/Test.csproj b/Test/Test.csproj index d0c0cef..6d299fb 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -16,7 +16,7 @@ - + diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj index 3b08b86..039c60b 100644 --- a/XUnitTest/XUnitTest.csproj +++ b/XUnitTest/XUnitTest.csproj @@ -10,10 +10,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive