112 lines
3.5 KiB
C#
112 lines
3.5 KiB
C#
using NewLife.Caching.Queues;
|
||
using NewLife.Data;
|
||
using NewLife.Security;
|
||
|
||
namespace NewLife.Caching.Services;
|
||
|
||
/// <summary>保存指定key数据的委托</summary>
|
||
/// <param name="key"></param>
|
||
/// <param name="data"></param>
|
||
public delegate void SaveDelegate(String key, IDictionary<String, Int32> data);
|
||
|
||
/// <summary>Redis流式统计</summary>
|
||
/// <remarks>
|
||
/// 借助Redis进行流式增量计算。
|
||
/// 1,HASH结构对Key下的统计标量进行累加
|
||
/// 2,有变化的Key进入延迟队列,延迟一定时间后进入普通队列
|
||
/// 3,消费队列,得到Key后进行Rename,然后把数据取回来写入数据库
|
||
/// </remarks>
|
||
public class RedisStat : DisposeBase
|
||
{
|
||
#region 属性
|
||
/// <summary>统计名称。如StationDayStat等</summary>
|
||
public String Name { get; }
|
||
|
||
/// <summary>取回统计数据后的委托。一般用于保存到数据库</summary>
|
||
public SaveDelegate OnSave { get; set; }
|
||
|
||
private readonly FullRedis _redis;
|
||
private readonly FullRedis _redis2;
|
||
private readonly RedisReliableQueue<String> _queue;
|
||
private readonly CancellationTokenSource _source;
|
||
#endregion
|
||
|
||
#region 构造
|
||
/// <summary>实例化</summary>
|
||
/// <param name="redis"></param>
|
||
/// <param name="name"></param>
|
||
public RedisStat(FullRedis redis, String name)
|
||
{
|
||
Name = name;
|
||
|
||
// 存放数据和去重key的库
|
||
_redis = redis;
|
||
|
||
// 存放消息队列的库,不能跟数据key放在一起,否则影响队列的scan
|
||
_redis2 = redis.CreateSub(redis.Db + 1) as FullRedis;
|
||
_redis2.Log = _redis.Log;
|
||
_queue = _redis2.GetReliableQueue<String>(name);
|
||
_source = new CancellationTokenSource();
|
||
Task.Factory.StartNew(() => _queue.ConsumeAsync<String>(OnProcess, _source.Token, null));
|
||
}
|
||
|
||
/// <summary>销毁</summary>
|
||
/// <param name="disposing"></param>
|
||
protected override void Dispose(Boolean disposing)
|
||
{
|
||
base.Dispose(disposing);
|
||
|
||
_source.Cancel();
|
||
}
|
||
#endregion
|
||
|
||
#region 方法
|
||
/// <summary>增加指定标量的值</summary>
|
||
/// <param name="key"></param>
|
||
/// <param name="field"></param>
|
||
/// <param name="value"></param>
|
||
public void Increment(String key, String field, Int32 value) => _redis.Execute(key, (c, k) => c.Execute<Int32>("HINCRBY", k, field, value), true);
|
||
|
||
/// <summary>放入延迟队列</summary>
|
||
/// <param name="key"></param>
|
||
/// <param name="delay"></param>
|
||
public void AddDelayQueue(String key, Int32 delay)
|
||
{
|
||
// 去重,避免重复加入队列
|
||
var rs = _redis.Add($"exists:{key}", DateTime.Now, 600);
|
||
if (rs) _queue.AddDelay(key, delay);
|
||
}
|
||
|
||
/// <summary>消费处理</summary>
|
||
/// <param name="key"></param>
|
||
protected virtual void OnProcess(String key)
|
||
{
|
||
/*
|
||
* 1,在HASH中重命名该Key
|
||
* 2,把HASH中该Key数据全部取回
|
||
*/
|
||
|
||
var newKey = $"{key}:{Rand.NextString(16)}";
|
||
if (!_redis.Rename(key, newKey, false)) return;
|
||
|
||
_redis.Remove($"exists:{key}");
|
||
var rs = _redis.Execute(newKey, (r, k) => r.Execute<IPacket[]>("HGETALL", k));
|
||
if (rs != null)
|
||
{
|
||
var dic = new Dictionary<String, Int32>();
|
||
for (var i = 0; i < rs.Length; i++)
|
||
{
|
||
var k = rs[i].ToStr();
|
||
var v = rs[++i].ToStr().ToInt();
|
||
dic[k] = v;
|
||
}
|
||
|
||
OnSave(key, dic);
|
||
}
|
||
|
||
rs.TryDispose();
|
||
|
||
_redis.Remove(newKey);
|
||
}
|
||
#endregion
|
||
} |