diff --git a/NewLife.Redis/Clusters/RedisReplication.cs b/NewLife.Redis/Clusters/RedisReplication.cs index b7e102b..4efbf49 100644 --- a/NewLife.Redis/Clusters/RedisReplication.cs +++ b/NewLife.Redis/Clusters/RedisReplication.cs @@ -1,5 +1,4 @@ -using System.Net.Sockets; -using NewLife.Log; +using NewLife.Log; using NewLife.Net; using NewLife.Threading; @@ -13,7 +12,7 @@ public class RedisReplication : RedisBase, IRedisCluster, IDisposable IList IRedisCluster.Nodes => Nodes.Select(x => (IRedisNode)x).ToList(); /// 节点改变事件 - public event EventHandler NodeChanged; + public event EventHandler? NodeChanged; /// 集群节点 public RedisNode[]? Nodes { get; protected set; } diff --git a/NewLife.Redis/Queues/RedisQueue.cs b/NewLife.Redis/Queues/RedisQueue.cs index 2725dd1..4735a37 100644 --- a/NewLife.Redis/Queues/RedisQueue.cs +++ b/NewLife.Redis/Queues/RedisQueue.cs @@ -112,7 +112,13 @@ public class RedisQueue : QueueBase, IProducerConsumer if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000; var rs = Execute((rc, k) => rc.Execute("BRPOP", Key, timeout), true); - return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + if (rs == null || rs.Length < 2) return default; + + var msg = (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + + if (typeof(T) != typeof(IPacket)) rs.TryDispose(); + + return msg; } /// 异步消费获取 @@ -126,7 +132,13 @@ public class RedisQueue : QueueBase, IProducerConsumer if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000; var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", [Key, timeout], cancellationToken), true).ConfigureAwait(false); - return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + if (rs == null || rs.Length < 2) return default; + + var msg = (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + + if (typeof(T) != typeof(IPacket)) rs.TryDispose(); + + return msg; } /// 异步消费获取 diff --git a/NewLife.Redis/Queues/RedisReliableQueue.cs b/NewLife.Redis/Queues/RedisReliableQueue.cs index 3f7ae51..460f635 100644 --- a/NewLife.Redis/Queues/RedisReliableQueue.cs +++ b/NewLife.Redis/Queues/RedisReliableQueue.cs @@ -442,6 +442,7 @@ public class RedisReliableQueue : QueueBase, IProducerConsumer, IDisposabl // 清理已经失去Status的Ack foreach (var key in rds.Search($"{_Key}:Ack:*", 1000)) + { if (!acks.Contains(key)) { var queue = rds.GetList(key) as RedisList; @@ -449,6 +450,7 @@ public class RedisReliableQueue : QueueBase, IProducerConsumer, IDisposabl XTrace.WriteLine("全局清理死信:{0} {1}", key, msgs.ToJson()); rds.Remove(key); } + } return count; } diff --git a/NewLife.Redis/RedisClient.cs b/NewLife.Redis/RedisClient.cs index e7c7b37..396dd2e 100644 --- a/NewLife.Redis/RedisClient.cs +++ b/NewLife.Redis/RedisClient.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; @@ -679,19 +678,22 @@ public class RedisClient : DisposeBase try { // 管道模式 + var type = typeof(TResult); if (_ps != null) { - _ps.Add(new Command(cmd, args, typeof(TResult))); + _ps.Add(new Command(cmd, args, type)); return default; } var rs = ExecuteCommand(cmd, args); if (rs == null) return default; if (rs is TResult rs2) return rs2; - if (TryChangeType(rs, typeof(TResult), out var target)) + + if (TryChangeType(rs, type, out var target)) { + //!!! 外部调用者可能需要直接使用内部申请的OwnerPacket,所以这里不释放 // 释放内部申请的OwnerPacket - rs.TryDispose(); + if (type != typeof(IPacket) && type != typeof(IPacket[])) rs.TryDispose(); return (TResult?)target; } @@ -725,10 +727,13 @@ public class RedisClient : DisposeBase value = default; if (rs == null) return false; - if (TryChangeType(rs, typeof(TResult), out var target)) + + var type = typeof(TResult); + if (TryChangeType(rs, type, out var target)) { + //!!! 外部调用者可能需要直接使用内部申请的OwnerPacket,所以这里不释放 // 释放内部申请的OwnerPacket - rs.TryDispose(); + if (type != typeof(IPacket) && type != typeof(IPacket[])) rs.TryDispose(); value = (TResult?)target; return true; } @@ -777,19 +782,22 @@ public class RedisClient : DisposeBase public virtual async Task ExecuteAsync(String cmd, Object?[] args, CancellationToken cancellationToken) { // 管道模式 + var type = typeof(TResult); if (_ps != null) { - _ps.Add(new Command(cmd, args, typeof(TResult))); + _ps.Add(new Command(cmd, args, type)); return default; } var rs = await ExecuteAsync(cmd, args, cancellationToken).ConfigureAwait(false); if (rs == null) return default; if (rs is TResult rs2) return rs2; - if (TryChangeType(rs, typeof(TResult), out var target)) + + if (TryChangeType(rs, type, out var target)) { + //!!! 外部调用者可能需要直接使用内部申请的OwnerPacket,所以这里不释放 // 释放内部申请的OwnerPacket - rs.TryDispose(); + if (type != typeof(IPacket) && type != typeof(IPacket[])) rs.TryDispose(); return (TResult?)target; } @@ -810,7 +818,14 @@ public class RedisClient : DisposeBase //var rs = ExecuteCommand(null, null, null); if (rs == null) return default; if (rs is TResult rs2) return rs2; - if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; + + var type = typeof(TResult); + if (TryChangeType(rs, type, out var target)) + { + // 释放内部申请的OwnerPacket + if (type != typeof(IPacket) && type != typeof(IPacket[])) rs.TryDispose(); + return (TResult?)target; + } return default; } @@ -939,8 +954,9 @@ public class RedisClient : DisposeBase var rs = list[i]; if (rs != null && TryChangeType(rs, ps[i].Type, out var target) && target != null) { + //!!! 外部调用者可能需要直接使用内部申请的OwnerPacket,所以这里不释放 // 释放内部申请的OwnerPacket - rs.TryDispose(); + if (ps[i].Type != typeof(IPacket) && ps[i].Type != typeof(IPacket[])) rs.TryDispose(); list[i] = target; } } diff --git a/NewLife.Redis/RedisHash.cs b/NewLife.Redis/RedisHash.cs index 0ff6b8e..14f531e 100644 --- a/NewLife.Redis/RedisHash.cs +++ b/NewLife.Redis/RedisHash.cs @@ -68,6 +68,8 @@ public class RedisHash : RedisBase, IDictionary value = Redis.Encoder.Decode(pk)!; //value = (TValue?)Redis.Encoder.Decode(pk, typeof(TValue))!; + if (typeof(TValue) != typeof(IPacket)) pk.TryDispose(); + return true; } @@ -164,6 +166,8 @@ public class RedisHash : RedisBase, IDictionary dic[key] = value; } + if (typeof(TKey) != typeof(IPacket) && typeof(TValue) != typeof(IPacket)) rs.TryDispose(); + return dic; } diff --git a/NewLife.Redis/RedisList.cs b/NewLife.Redis/RedisList.cs index 1d472a6..bd8cbf3 100644 --- a/NewLife.Redis/RedisList.cs +++ b/NewLife.Redis/RedisList.cs @@ -47,14 +47,7 @@ public class RedisList : RedisBase, IList /// 是否包含指定元素 /// /// - public Boolean Contains(T item) - { - var count = Count; - if (count > 1000) throw new NotSupportedException($"[{Key}]的元素个数过多,不支持!"); - - var list = GetAll(); - return list.Contains(item); - } + public Boolean Contains(T item) => IndexOf(item) >= 0; /// 复制到目标数组 /// @@ -72,11 +65,31 @@ public class RedisList : RedisBase, IList /// public Int32 IndexOf(T item) { - var count = Count; - if (count > 1000) throw new NotSupportedException($"[{Key}]的元素个数过多,不支持!"); + // Redis7支持LPOS + if (Redis.Version.Major >= 7) return LPOS(item); - var arr = GetAll(); - return Array.IndexOf(arr, item); + var p = 0; + var batch = 100; + while (true) + { + var arr = LRange(p, p + batch - 1); + if (arr == null || arr.Length == 0) break; + + var idx = Array.IndexOf(arr, item); + if (idx >= 0) return p + idx; + + if (p >= 1_000_000) throw new NotSupportedException($"[{Key}]的元素个数过多,不支持遍历!"); + + p += batch; + } + + return -1; + + //var count = Count; + //if (count > 1000) + + //var arr = GetAll(); + //return Array.IndexOf(arr, item); } /// 在指定位置插入 @@ -217,5 +230,9 @@ public class RedisList : RedisBase, IList /// /// public Int32 LRem(Int32 count, T value) => Execute((r, k) => r.Execute("LREM", Key, count, value), true); + + /// 获取元素位置 + /// + public Int32 LPOS(T item) => Execute((rc, k) => rc.Execute("LPOS", Key, item), false); #endregion } \ No newline at end of file diff --git a/NewLife.Redis/RedisStack.cs b/NewLife.Redis/RedisStack.cs index d69fda2..26de795 100644 --- a/NewLife.Redis/RedisStack.cs +++ b/NewLife.Redis/RedisStack.cs @@ -83,7 +83,13 @@ public class RedisStack : RedisBase, IProducerConsumer if (timeout < 0) return Execute((rc, k) => rc.Execute("RPOP", Key), true); var rs = Execute((rc, k) => rc.Execute("BRPOP", Key, timeout), true); - return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + if (rs == null || rs.Length < 2) return default; + + var msg = (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + + if (typeof(T) != typeof(IPacket)) rs.TryDispose(); + + return msg; } /// 异步消费获取 @@ -95,7 +101,13 @@ public class RedisStack : RedisBase, IProducerConsumer if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOP", Key), true).ConfigureAwait(false); var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", [Key, timeout], cancellationToken), true).ConfigureAwait(false); - return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + if (rs == null || rs.Length < 2) return default; + + var msg = (T?)Redis.Encoder.Decode(rs[1], typeof(T)); + + if (typeof(T) != typeof(IPacket)) rs.TryDispose(); + + return msg; } /// 异步消费获取 diff --git a/NewLife.Redis/Services/RedisStat.cs b/NewLife.Redis/Services/RedisStat.cs index e575566..0812a77 100644 --- a/NewLife.Redis/Services/RedisStat.cs +++ b/NewLife.Redis/Services/RedisStat.cs @@ -90,7 +90,7 @@ public class RedisStat : DisposeBase if (!_redis.Rename(key, newKey, false)) return; _redis.Remove($"exists:{key}"); - var rs = _redis.Execute(newKey, (r,k) => r.Execute("HGETALL", k)); + var rs = _redis.Execute(newKey, (r, k) => r.Execute("HGETALL", k)); if (rs != null) { var dic = new Dictionary(); @@ -104,6 +104,8 @@ public class RedisStat : DisposeBase OnSave(key, dic); } + rs.TryDispose(); + _redis.Remove(newKey); } #endregion diff --git a/XUnitTest/HashTest.cs b/XUnitTest/HashTest.cs index b0db7e1..bd3e379 100644 --- a/XUnitTest/HashTest.cs +++ b/XUnitTest/HashTest.cs @@ -1,9 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Reflection; -using System.Runtime.Intrinsics.Arm; - using NewLife.Caching; using NewLife.Log; using Xunit; @@ -105,14 +102,43 @@ public class HashTest var hash = _redis.GetDictionary(key); Assert.NotNull(hash); - var l = hash as RedisHash; + var rh = hash as RedisHash; - foreach(var item in l.GetAll()) + foreach (var item in rh.GetAll()) { XTrace.WriteLine(item.Key); } - l["0"] = "0"; + rh["0"] = new EventInfo { EventId = "1234", EventName = "Stone" }; + } + + [Fact] + public void RemoveTest() + { + var key = $"NewLife:eventinfo:adsfasdfasdfdsaf"; + + var hash = _redis.GetDictionary(key); + Assert.NotNull(hash); + + var rh = hash as RedisHash; + + foreach (var item in rh.GetAll()) + { + XTrace.WriteLine(item.Key); + } + + rh["0"] = new EventInfo { EventId = "1234", EventName = "Stone" }; + rh["1"] = new EventInfo { EventId = "12345", EventName = "Stone" }; + rh["2"] = new EventInfo { EventId = "123456", EventName = "Stone" }; + + rh.Remove("0"); + Assert.Equal(2, rh.Count); + } + + class EventInfo + { + public String EventId { get; set; } + public String EventName { get; set; } } } diff --git a/XUnitTest/ListTests.cs b/XUnitTest/ListTests.cs index f1c45f3..b046560 100644 --- a/XUnitTest/ListTests.cs +++ b/XUnitTest/ListTests.cs @@ -1,5 +1,6 @@ using NewLife.Caching; using NewLife.Log; +using NewLife.Security; using System; using System.Diagnostics; using System.Linq; @@ -164,6 +165,38 @@ public class ListTests Assert.Equal(vs3[1], item2); } + [Fact] + public void List_IndexOf() + { + var key = "lkey_indexof"; + + // 删除已有 + _redis.Remove(key); + + var rlist = _redis.GetList(key) as RedisList; + Assert.NotNull(rlist); + + // 添加 + var vs = Enumerable.Range(0, 1000).Select(e => Rand.NextString(8)).ToArray(); + rlist.AddRange(vs); + _redis.SetExpire(key, TimeSpan.FromSeconds(60)); + + // 索引 + var idx = rlist.IndexOf(vs[1]); + Assert.Equal(1, idx); + + idx = rlist.IndexOf("abcd2"); + Assert.Equal(-1, idx); + + idx = rlist.IndexOf(vs[321]); + Assert.Equal(321, idx); + + var rs = rlist.Contains(vs[456]); + Assert.True(rs); + + _redis.Remove(key); + } + [Fact] public void RPOPLPUSH_Test() {