Redis/NewLife.Redis/Queues/RedisReliableQueue.cs

511 lines
18 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Diagnostics;
using NewLife.Log;
using NewLife.Security;
using NewLife.Serialization;
namespace NewLife.Caching.Queues;
/// <summary>可靠Redis队列左进右出</summary>
/// <remarks>
/// 严格模式下消费弹出消息的同时插入Ack队列消费者处理成功后将从ACK队列删除该消息若处理失败则将延迟消费Ack消息
///
/// 可信队列对象不是线程安全,要求每个线程独享队列对象。
/// 为了让严格模式支持多线程消费确认队列AckKey构造为 Key:Ack:Rand16 的格式,每一个消费者都将有自己完全独一无二的确认队列。
/// 消费者每30秒RetryInterval清理一次确认队列的死信未确认消息重新投入主队列。
/// 应用异常退出时可能产生一些死信在应用启动首次消费时通过TakeAllAck消费清理所有Ack队列。
/// 由于引入状态队列,清理不活跃消费者时,不会影响正常消费者。
///
/// 设计要点:
/// 1消费时RPOPLPUSH从Key弹出并备份到AckKey消息处理完成后再从AckKey删除
/// 2AckKey设计为Key:Ack:ukeyukey=Rand16让每个实例都有专属的Ack确认队列
/// 3消费时每60秒更新一次状态到Key:Status:ukey表明ukey还在消费
/// 4全局定期扫描Key:Status:ukey若不活跃回滚它的Ack消息
///
/// 消费者要慎重处理错误消息,有可能某条消息一直处理失败,如果未确认,队列会反复把消息送回主队列。
/// 建议用户自己处理并确认消费通过消息体或者redisKey计数。
///
/// 高级队列技巧:
/// 1按kv写入消息体然后key作为消息键写入队列并消费成功消费后从kv删除
/// 2消息键key自定义随时可以查看或删除消息体也可以避免重复生产
/// 3Redis队列确保至少消费一次消息体和消息键分离后可以做到有且仅有一次若有二次消费再也拿不到数据内容
/// 4同一个消息被重复生产时尽管队列里面有两条消息键但由于消息键相同消息体只有一份从而避免重复消费
///
/// 可信Redis队列每次生产操作1次Redis消费操作2次Redis
/// 高级Redis队列每次生产操作3次Redis消费操作4次Redis
/// </remarks>
/// <typeparam name="T"></typeparam>
public class RedisReliableQueue<T> : QueueBase, IProducerConsumer<T>, IDisposable
{
#region
/// <summary>用于确认的列表</summary>
public String AckKey { get; set; }
/// <summary>重新处理确认队列中死信的间隔。默认60s</summary>
public Int32 RetryInterval { get; set; } = 60;
/// <summary>最小管道阈值达到该值时使用管道默认3</summary>
public Int32 MinPipeline { get; set; } = 3;
/// <summary>个数</summary>
public Int32 Count => Execute((r, k) => r.Execute<Int32>("LLEN", Key));
/// <summary>是否为空</summary>
public Boolean IsEmpty => Count == 0;
/// <summary>消费状态</summary>
public RedisQueueStatus Status => _Status;
private readonly String _Key;
private readonly String _StatusKey;
private readonly RedisQueueStatus _Status;
private RedisDelayQueue<T>? _delay;
private CancellationTokenSource? _source;
private Task? _delayTask;
#endregion
#region
/// <summary>实例化队列</summary>
/// <param name="redis"></param>
/// <param name="key"></param>
public RedisReliableQueue(Redis redis, String key) : base(redis, key)
{
_Key = redis is FullRedis rds ? rds.GetKey(key) : key;
_Status = CreateStatus();
AckKey = $"{_Key}:Ack:{_Status.Key}";
_StatusKey = $"{_Key}:Status:{_Status.Key}";
}
/// <summary>析构</summary>
~RedisReliableQueue() => Dispose(false);
/// <summary>释放</summary>
public void Dispose() => Dispose(true);
/// <summary>释放</summary>
/// <param name="disposing"></param>
protected virtual void Dispose(Boolean disposing)
{
if (_delay != null)
{
_delay = null;
_delayTask = null;
_source?.Cancel();
}
}
#endregion
#region
/// <summary>批量生产添加</summary>
/// <param name="values">消息集合</param>
/// <returns>返回插入后的LIST长度</returns>
public Int32 Add(params T[] values)
{
if (values == null || values.Length == 0) return 0;
using var span = Redis.Tracer?.NewSpan($"redismq:{TraceName}:Add", values);
try
{
var args = new List<Object> { Key };
foreach (var item in values)
{
if (AttachTraceId)
args.Add(Redis.AttachTraceId(item));
else
args.Add(item);
}
var rs = 0;
for (var i = 0; i <= RetryTimesWhenSendFailed; i++)
{
// 返回插入后的LIST长度。Redis执行命令不会失败因此正常插入不应该返回0如果返回了0或者空可能是中间代理出了问题
rs = Execute((rc, k) => rc.Execute<Int32>("LPUSH", args.ToArray()), true);
if (rs > 0) return rs;
span?.SetError(new InvalidOperationException($"发布到队列[{Topic}]失败!"), null);
if (i < RetryTimesWhenSendFailed) Thread.Sleep(RetryIntervalWhenSendFailed);
}
ValidWhenSendFailed(span);
return rs;
}
catch (Exception ex)
{
span?.SetError(ex, null);
throw;
}
}
/// <summary>消费获取从Key弹出并备份到AckKey支持阻塞</summary>
/// <remarks>假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀</remarks>
/// <param name="timeout">超时时间默认0秒永远阻塞负数表示直接返回不阻塞。</param>
/// <returns></returns>
public T? TakeOne(Int32 timeout = 0)
{
RetryAck();
if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000;
var rs = timeout >= 0 ?
Execute((rc, k) => rc.Execute<T>("BRPOPLPUSH", Key, AckKey, timeout), true) :
Execute((rc, k) => rc.Execute<T>("RPOPLPUSH", Key, AckKey), true);
if (rs != null) _Status.Consumes++;
return rs;
}
/// <summary>异步消费获取</summary>
/// <param name="timeout">超时时间默认0秒永远阻塞负数表示直接返回不阻塞。</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public async Task<T?> TakeOneAsync(Int32 timeout = 0, CancellationToken cancellationToken = default)
{
RetryAck();
if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000;
var rs = timeout < 0 ?
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("RPOPLPUSH", [Key, AckKey], cancellationToken), true).ConfigureAwait(false) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("BRPOPLPUSH", [Key, AckKey, timeout], cancellationToken), true).ConfigureAwait(false);
if (rs != null) _Status.Consumes++;
return rs;
}
/// <summary>异步消费获取</summary>
/// <param name="timeout">超时时间默认0秒永远阻塞负数表示直接返回不阻塞。</param>
/// <returns></returns>
Task<T?> IProducerConsumer<T>.TakeOneAsync(Int32 timeout) => TakeOneAsync(timeout, default);
/// <summary>批量消费获取从Key弹出并备份到AckKey</summary>
/// <remarks>假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀</remarks>
/// <param name="count">要消费的消息个数</param>
/// <returns></returns>
public IEnumerable<T> Take(Int32 count = 1)
{
if (count <= 0) yield break;
RetryAck();
// 借助管道支持批量获取
if (count >= MinPipeline)
{
var rds = Redis;
rds.StartPipeline();
for (var i = 0; i < count; i++)
Execute((rc, k) => rc.Execute<T>("RPOPLPUSH", Key, AckKey), true);
var rs = rds.StopPipeline(true);
foreach (var item in rs)
{
if (item is null || Equals(item, default(T))) { break; }
_Status.Consumes++;
yield return (T)item;
}
}
else
for (var i = 0; i < count; i++)
{
var value = Execute((rc, k) => rc.Execute<T>("RPOPLPUSH", Key, AckKey), true);
if (value is null || Equals(value, default(T))) break;
_Status.Consumes++;
yield return value;
}
}
/// <summary>确认消费从AckKey中删除</summary>
/// <param name="keys"></param>
public Int32 Acknowledge(params String[] keys)
{
var rs = 0;
_Status.Acks += keys.Length;
// 管道支持
if (keys.Count() >= MinPipeline)
{
var rds = Redis;
rds.StartPipeline();
foreach (var item in keys)
Execute((r, k) => r.Execute<Int32>("LREM", AckKey, 1, item), true);
var rs2 = rds.StopPipeline(true);
foreach (var item in rs2)
{
rs += (Int32)item;
}
}
else
{
foreach (var item in keys)
{
rs += Execute((r, k) => r.Execute<Int32>("LREM", AckKey, 1, item), true);
}
}
return rs;
}
#endregion
#region
/// <summary>初始化延迟队列功能。生产者自动初始化,消费者最好能够按队列初始化一次</summary>
/// <remarks>
/// 该功能是附加功能,需要消费者主动调用,每个队列的多消费者开一个即可。
/// 核心工作是启动延迟队列的TransferAsync大循环每个进程内按队列开一个最合适多了没有用反而形成争夺。
/// </remarks>
public RedisDelayQueue<T> InitDelay()
{
if (_delay == null)
{
lock (this)
{
_delay ??= new RedisDelayQueue<T>(Redis, $"{Key}:Delay");
}
}
if (_delayTask == null || _delayTask.IsCompleted)
{
lock (this)
{
if (_delayTask == null || _delayTask.IsCompleted)
{
_source = new CancellationTokenSource();
_delayTask = Task.Run(() => _delay.TransferAsync(this, null, _source.Token));
}
}
}
return _delay;
}
/// <summary>添加延迟消息</summary>
/// <param name="value"></param>
/// <param name="delay"></param>
/// <returns></returns>
public Int32 AddDelay(T value, Int32 delay)
{
InitDelay();
return _delay.Add(value, delay);
}
/// <summary>高级生产消息。消息体和消息键分离,业务层指定消息键,可随时查看或删除,同时避免重复生产</summary>
/// <remarks>
/// Publish 必须跟 ConsumeAsync 配对使用。
/// </remarks>
/// <param name="messages">消息字典id为键消息体为值</param>
/// <param name="expire">消息体过期时间,单位秒</param>
/// <returns></returns>
public Int32 Publish(IDictionary<String, T> messages, Int32 expire)
{
// 消息体写入kv
Redis.SetAll(messages, expire);
// 消息键写入队列
var args = new List<Object> { Key };
foreach (var item in messages)
{
args.Add(item.Key);
}
var rs = Execute((rc, k) => rc.Execute<Int32>("LPUSH", args.ToArray()), true);
return rs;
}
/// <summary>高级消费消息。消息处理成功后,自动确认并删除消息体</summary>
/// <remarks>
/// Publish 必须跟 ConsumeAsync 配对使用。
/// </remarks>
/// <param name="func"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<TResult> ConsumeAsync<TResult>(Func<T, Task<TResult>> func, Int32 timeout = 0)
{
RetryAck();
// 取出消息键
var msgId = timeout < 0 ?
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("RPOPLPUSH", Key, AckKey), true).ConfigureAwait(false) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("BRPOPLPUSH", Key, AckKey, timeout), true).ConfigureAwait(false);
if (msgId.IsNullOrEmpty()) return default;
_Status.Consumes++;
// 取出消息。如果重复消费,或者业务层已经删除消息,此时将拿不到
if (!Redis.TryGetValue(msgId, out T messge))
{
// 拿不到消息体,直接确认消息键
Acknowledge(msgId);
return default;
}
// 处理消息。如果消息已被删除此时调用func将受到空引用
var rs = await func(messge).ConfigureAwait(false);
// 确认并删除消息
Redis.Remove(msgId);
Acknowledge(msgId);
return rs;
}
#endregion
#region
/// <summary>从确认列表弹出消息,用于消费中断后,重新恢复现场时获取</summary>
/// <remarks>理论上Ack队列只存储极少数数据</remarks>
/// <param name="count"></param>
/// <returns></returns>
public IEnumerable<String> TakeAck(Int32 count = 1)
{
if (count <= 0) yield break;
for (var i = 0; i < count; i++)
{
var value = Execute((rc, k) => rc.Execute<String>("RPOP", AckKey), true);
//if (Equals(value, default(T))) break;
if (value == null) break;
yield return value;
}
}
/// <summary>清空所有Ack队列。危险操作</summary>
/// <returns></returns>
public Int32 ClearAllAck()
{
var rds = Redis as FullRedis;
// 先找到所有Key
var keys = rds.Search($"{_Key}:Ack:*", 1000).ToArray();
return keys.Length > 0 ? rds.Remove(keys) : 0;
}
/// <summary>回滚指定AckKey内的消息到Key</summary>
/// <param name="key"></param>
/// <param name="ackKey"></param>
/// <returns></returns>
private List<String> RollbackAck(String key, String ackKey)
{
// 消费所有数据
var list = new List<String>();
while (true)
{
var value = Execute((rc, k) => rc.Execute<String>("RPOPLPUSH", ackKey, key), true);
if (value == null) break;
list.Add(value);
}
return list;
}
/// <summary>全局回滚死信,一般由单一线程执行,避免干扰处理中数据</summary>
/// <returns></returns>
public Int32 RollbackAllAck()
{
var rds = Redis as FullRedis;
// 先找到所有Key
var count = 0;
var acks = new List<String>();
foreach (var key in rds.Search($"{_Key}:Status:*", 1000))
{
var ackKey = $"{_Key}:Ack:{key.TrimStart($"{_Key}:Status:")}";
acks.Add(ackKey);
var st = rds.Get<RedisQueueStatus>(key);
if (st != null && st.LastActive.AddSeconds(RetryInterval * 10) < DateTime.Now)
{
if (rds.ContainsKey(ackKey))
{
XTrace.WriteLine("发现死信队列:{0}", ackKey);
var list = RollbackAck(_Key, ackKey);
foreach (var item in list)
XTrace.WriteLine("全局回滚死信:{0}", item);
count += list.Count;
}
// 删除状态
rds.Remove(key);
XTrace.WriteLine("删除队列状态:{0} {1}", key, st.ToJson());
}
}
// 清理已经失去Status的Ack
foreach (var key in rds.Search($"{_Key}:Ack:*", 1000))
{
if (!acks.Contains(key))
{
var queue = rds.GetList<String>(key) as RedisList<String>;
var msgs = queue.GetAll();
XTrace.WriteLine("全局清理死信:{0} {1}", key, msgs.ToJson());
rds.Remove(key);
}
}
return count;
}
private DateTime _nextRetry;
/// <summary>处理未确认的死信,重新放入队列</summary>
private void RetryAck()
{
var now = DateTime.Now;
// 一定间隔处理当前ukey死信
if (_nextRetry < now)
{
_nextRetry = now.AddSeconds(RetryInterval);
// 拿到死信,重新放入队列
var list = RollbackAck(_Key, AckKey);
foreach (var item in list)
XTrace.WriteLine("定时回滚死信:{0}", item);
// 更新状态
UpdateStatus();
// 处理其它消费者遗留下来的死信,需要抢夺全局清理权,减少全局扫描次数
if (Redis.Add($"{_Key}:AllStatus", _Status, RetryInterval)) RollbackAllAck();
}
}
#endregion
#region
private static readonly RedisQueueStatus _def = new()
{
MachineName = Environment.MachineName,
UserName = Environment.UserName,
ProcessId = Process.GetCurrentProcess().Id,
Ip = NetHelper.MyIP() + "",
};
private RedisQueueStatus CreateStatus()
{
return new RedisQueueStatus
{
Key = Rand.NextString(8),
MachineName = _def.MachineName,
UserName = _def.UserName,
ProcessId = _def.ProcessId,
Ip = _def.Ip,
CreateTime = DateTime.Now,
LastActive = DateTime.Now,
};
}
private void UpdateStatus()
{
// 更新状态7天过期
_Status.LastActive = DateTime.Now;
Redis.Set(_StatusKey, _Status, 7 * 24 * 3600);
}
#endregion
}