[feat] 新增Redis事件总线,用于分布式场景向多节点部署应用集中分发消息。例如星尘和IoT平台的指令下发,先把消息发布到RedisStream中,由所有应用实例消费,再从本地会话管理中找到长连接完成消息下发。命令消息的前面,一般增加设备编码并以#隔开

This commit is contained in:
智能大石头 2025-02-24 19:34:09 +08:00
parent feb20a472d
commit 2e786f1250
5 changed files with 124 additions and 5 deletions

View File

@ -4,11 +4,12 @@ using System.Text;
using NewLife.Caching.Clusters;
using NewLife.Caching.Models;
using NewLife.Caching.Queues;
using NewLife.Caching.Services;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Model;
using NewLife.Serialization;
namespace NewLife.Caching;
@ -645,6 +646,9 @@ public class FullRedis : Redis
/// <param name="key"></param>
/// <returns></returns>
public virtual RedisSortedSet<T> GetSortedSet<T>(String key) => new(this, key);
/// <summary>获取事件总线</summary>
public override IEventBus<T> GetEventBus<T>(String topic, String clientId = "") => new RedisEventBus<T>(this, topic, clientId);
#endregion
#region

View File

@ -56,7 +56,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
<PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,115 @@
using NewLife.Caching.Queues;
using NewLife.Log;
using NewLife.Messaging;
using System.Diagnostics.CodeAnalysis;
namespace NewLife.Caching.Services;
/// <summary>Redis事件上下文</summary>
public class RedisEventContext<TEvent>(IEventBus<TEvent> eventBus, Queues.Message message) : IEventContext<TEvent>
{
/// <summary>事件总线</summary>
public IEventBus<TEvent> EventBus { get; set; } = eventBus;
/// <summary>原始消息</summary>
public Queues.Message Message { get; set; } = message;
}
/// <summary>Redis事件总线</summary>
/// <typeparam name="TEvent"></typeparam>
/// <remarks>实例化消息队列事件总线</remarks>
public class RedisEventBus<TEvent>(FullRedis cache, String topic, String group) : EventBus<TEvent>
{
private RedisStream<TEvent>? _queue;
private CancellationTokenSource? _source;
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
_source?.TryDispose();
}
/// <summary>初始化</summary>
[MemberNotNull(nameof(_queue))]
protected virtual void Init()
{
if (_queue != null) return;
// 创建Stream队列指定消费组从最后位置开始消费
_queue = cache.GetStream<TEvent>(topic);
_queue.Group = group;
_queue.FromLastOffset = true;
}
/// <summary>发布消息到消息队列</summary>
/// <param name="event">事件</param>
/// <param name="context">上下文</param>
public override Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null)
{
Init();
var rs = _queue.Add(@event);
return Task.FromResult(1);
}
/// <summary>订阅消息。启动大循环,从消息队列订阅消息,再分发到本地订阅者</summary>
/// <param name="handler">处理器</param>
/// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
public override Boolean Subscribe(IEventHandler<TEvent> handler, String clientId = "")
{
if (_source == null)
{
var source = new CancellationTokenSource();
if (Interlocked.CompareExchange(ref _source, source, null) == null)
{
Init();
_ = Task.Run(() => ConsumeMessage(_source));
}
}
// 本进程订阅。从队列中消费到消息时,会发布到本进程的事件总线,这里订阅可以让目标处理器直接收到消息
return base.Subscribe(handler, clientId);
}
/// <summary>从队列中消费消息,经事件总线送给设备会话</summary>
/// <param name="source"></param>
/// <returns></returns>
protected virtual async Task ConsumeMessage(CancellationTokenSource source)
{
DefaultSpan.Current = null;
var cancellationToken = source.Token;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var msg = await _queue!.TakeMessageAsync(15, cancellationToken).ConfigureAwait(false);
if (msg != null)
{
var msg2 = msg.GetBody<TEvent>();
if (msg2 != null)
{
// 发布到事件总线
await base.PublishAsync(msg2, new RedisEventContext<TEvent>(this, msg)).ConfigureAwait(false);
}
}
else
{
await Task.Delay(1_000, cancellationToken).ConfigureAwait(false);
}
}
}
catch (TaskCanceledException) { }
catch (OperationCanceledException) { }
catch (Exception ex)
{
XTrace.WriteException(ex);
}
finally
{
source.Cancel();
}
}
}

View File

@ -16,7 +16,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
<PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
</ItemGroup>
<ItemGroup>

View File

@ -10,10 +10,10 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
<PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
<PackageReference Include="NewLife.UnitTest" Version="1.0.2025.101" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>