diff --git a/NewLife.Redis/Buffers/BufferedReader.cs b/NewLife.Redis/Buffers/BufferedReader.cs new file mode 100644 index 0000000..87475f0 --- /dev/null +++ b/NewLife.Redis/Buffers/BufferedReader.cs @@ -0,0 +1,146 @@ +using NewLife.Data; + +namespace NewLife.Caching.Buffers; + +/// 缓存的读取器 +public ref struct BufferedReader +{ + #region 属性 + private ReadOnlySpan _span; + /// 数据片段 + public ReadOnlySpan Span => _span; + + private Int32 _index; + /// 已读取字节数 + public Int32 Position { get => _index; set => _index = value; } + + /// 总容量 + public Int32 Capacity => _span.Length; + + /// 空闲容量 + public Int32 FreeCapacity => _span.Length - _index; + + private readonly Stream _stream; + private readonly Int32 _bufferSize; + private IPacket _data; + #endregion + + #region 构造 + /// 实例化Span读取器 + /// + /// + /// + public BufferedReader(Stream stream, IPacket data, Int32 bufferSize = 8192) + { + _stream = stream; + _bufferSize = bufferSize; + _data = data; + _span = data.GetSpan(); + } + #endregion + + #region 基础方法 + /// 告知有多少数据已从缓冲区读取 + /// + public void Advance(Int32 count) + { + if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); + if (_index + count > _span.Length) throw new ArgumentOutOfRangeException(nameof(count)); + + _index += count; + } + + /// 返回要写入到的Span,其大小按 sizeHint 参数指定至少为所请求的大小 + /// + /// + /// + public ReadOnlySpan GetSpan(Int32 sizeHint = 0) + { + if (sizeHint > FreeCapacity) throw new ArgumentOutOfRangeException(nameof(sizeHint)); + + return _span[_index..]; + } + #endregion + + #region 读取方法 + /// 确保缓冲区中有足够的空间。 + /// 需要的字节数。 + /// + public void EnsureSpace(Int32 size) + { + // 检查剩余空间大小,不足时,再从数据流中读取。此时需要注意,创建新的OwnerPacket后,需要先把之前剩余的一点数据拷贝过去,然后再读取Stream + var remain = FreeCapacity; + if (remain < size) + { + var idx = 0; + var pk = new OwnerPacket(_bufferSize); + if (_data != null && remain > 0) + { + if (!_data.TryGetArray(out var arr)) throw new NotSupportedException(); + + arr.AsSpan(_index, remain).CopyTo(pk.Buffer); + idx += remain; + } + + _data.TryDispose(); + _data = pk; + _index = 0; + + // 多次读取,直到满足需求 + //var n = _stream.ReadExactly(pk.Buffer, pk.Offset + idx, pk.Length - idx); + while (idx < size) + { + var n = _stream.Read(pk.Buffer, pk.Offset + idx, pk.Length - idx); + if (n < 0) break; + + idx += n; + } + if (idx < size) + throw new InvalidOperationException("Not enough data to read."); + pk.Resize(idx); + + _span = pk.GetSpan(); + } + + if (_index + size > _span.Length) + throw new InvalidOperationException("Not enough data to read."); + } + + /// 读取单个字节 + /// + public Byte ReadByte() + { + var size = sizeof(Byte); + EnsureSpace(size); + + var result = _span[_index]; + _index += size; + return result; + } + + /// 读取字节数组 + /// + /// + /// + public ReadOnlySpan ReadBytes(Int32 length) + { + EnsureSpace(length); + + var result = _span.Slice(_index, length); + _index += length; + return result; + } + + /// 读取数据包 + /// + /// + public IPacket ReadPacket(Int32 length) + { + EnsureSpace(length); + + var result = _data.Slice(_index, length); + _index += length; + return result; + } + #endregion +} diff --git a/NewLife.Redis/RedisClient.cs b/NewLife.Redis/RedisClient.cs index 21746ab..3362b5e 100644 --- a/NewLife.Redis/RedisClient.cs +++ b/NewLife.Redis/RedisClient.cs @@ -7,6 +7,7 @@ using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; using NewLife.Buffers; +using NewLife.Caching.Buffers; using NewLife.Collections; using NewLife.Data; using NewLife.Log; @@ -309,24 +310,19 @@ public class RedisClient : DisposeBase /// protected virtual IList GetResponse(Stream ns, Int32 count) { - var ms = new BufferedStream(ns); + //var ms = new BufferedStream(ns); + var ms = ns; - Char header; - var buf = Pool.Shared.Rent(1); - try - { - // 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取 - var n = ms.Read(buf, 0, 1); - if (n <= 0) return []; + using var pk = new OwnerPacket(MAX_POOL_SIZE); - header = (Char)buf[0]; - } - finally - { - Pool.Shared.Return(buf); - } + var n = ms.Read(pk.Buffer, pk.Offset, pk.Length); + if (n <= 0) return []; - return ParseResponse(ms, count, header); + pk.Resize(n); + + var reader = new BufferedReader(ms, pk, 8192); + + return ParseResponse(ms, count, ref reader); } /// 异步接收响应 @@ -336,27 +332,23 @@ public class RedisClient : DisposeBase /// protected virtual async Task> GetResponseAsync(Stream ns, Int32 count, CancellationToken cancellationToken = default) { - Char header; - var buf = Pool.Shared.Rent(1); - try - { - // 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取 - if (cancellationToken == CancellationToken.None) - cancellationToken = new CancellationTokenSource(Timeout > 0 ? Timeout : Host.Timeout).Token; - var n = await ns.ReadAsync(buf, 0, 1, cancellationToken).ConfigureAwait(false); - if (n <= 0) return []; + var ms = ns; + using var pk = new OwnerPacket(MAX_POOL_SIZE); - header = (Char)buf[0]; - } - finally - { - Pool.Shared.Return(buf); - } + // 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取 + if (cancellationToken == CancellationToken.None) + cancellationToken = new CancellationTokenSource(Timeout > 0 ? Timeout : Host.Timeout).Token; + var n = await ns.ReadAsync(pk.Buffer, pk.Offset, pk.Length, cancellationToken).ConfigureAwait(false); + if (n <= 0) return []; - return ParseResponse(ns, count, header); + pk.Resize(n); + + var reader = new BufferedReader(ms, pk, 8192); + + return ParseResponse(ms, count, ref reader); } - private IList ParseResponse(Stream ms, Int32 count, Char header) + private IList ParseResponse(Stream ms, Int32 count, ref BufferedReader reader) { /* * 响应格式 @@ -370,31 +362,28 @@ public class RedisClient : DisposeBase var list = new List(); var log = Log == null || Log == Logger.Null ? null : Pool.StringBuilder.Get(); + //var reader = new SpanReader(pk.GetSpan()); + var header = (Char)reader.ReadByte(); + // 多行响应 for (var i = 0; i < count; i++) { // 解析响应 - if (i > 0) - { - var b = ms.ReadByte(); - if (b == -1) break; - - header = (Char)b; - } + if (i > 0) header = (Char)reader.ReadByte(); log?.Append(header); if (header == '$') { - list.Add(ReadBlock(ms, log)); + list.Add(ReadBlock(ref reader, log)); } else if (header == '*') { - list.Add(ReadBlocks(ms, log)); + list.Add(ReadBlocks(ref reader, log)); } else { // 字符串以换行为结束符 - var str = ReadLine(ms); + var str = ReadLine(ref reader); log?.Append(str); if (header is '+' or ':') @@ -544,95 +533,68 @@ public class RedisClient : DisposeBase } } - private static IPacket? ReadBlock(Stream ms, StringBuilder? log) => ReadPacket(ms, log); + private static IPacket? ReadBlock(ref BufferedReader reader, StringBuilder? log) => ReadPacket(ref reader, log); - private Object?[] ReadBlocks(Stream ms, StringBuilder? log) + private Object?[] ReadBlocks(ref BufferedReader reader, StringBuilder? log) { // 结果集数量 - var len = ReadLength(ms); + var len = ReadLength(ref reader); log?.Append(len); if (len < 0) return []; var arr = new Object?[len]; for (var i = 0; i < len; i++) { - var b = ms.ReadByte(); - if (b == -1) break; - - var header = (Char)b; + var header = (Char)reader.ReadByte(); log?.Append(' '); log?.Append(header); if (header == '$') { - arr[i] = ReadPacket(ms, log); + arr[i] = ReadPacket(ref reader, log); } else if (header is '+' or ':') { - arr[i] = ReadLine(ms); + arr[i] = ReadLine(ref reader); log?.Append(arr[i]); } else if (header == '*') { - arr[i] = ReadBlocks(ms, log); + arr[i] = ReadBlocks(ref reader, log); } } return arr; } - private static IPacket? ReadPacket(Stream ms, StringBuilder? log) + private static IPacket? ReadPacket(ref BufferedReader reader, StringBuilder? log) { - var len = ReadLength(ms); + var len = ReadLength(ref reader); log?.Append(len); if (len == 0) { // 某些字段即使长度是0,还是要把换行符读走 - ReadLine(ms); + ReadLine(ref reader); return null; } if (len <= 0) return null; len += 2; - //// 很多时候,数据长度为1,特殊优化 - //if (len == 3) - //{ - // var rs = ms.ReadByte(); - // // 再读取两个换行符,网络流不支持Seek - // //ms.Seek(2, SeekOrigin.Current); - // ms.ReadByte(); - // ms.ReadByte(); - // return rs; - //} - - // 从内存池借出,包装到MemorySegment中,一路向上传递,用完后Dispose还到池里 - var owner = new OwnerPacket(len); - var span = owner.GetSpan(); - - var p = 0; - while (p < len) - { - // 等待,直到读完需要的数据,避免大包丢数据 - var count = ms.Read(span.Slice(p, len - p)); - if (count <= 0) break; - - p += count; - } - - return owner.Slice(0, p - 2); + // 读取数据包,并跳过换行符 + var pk = reader.ReadPacket(len - 2); + reader.Advance(2); + return pk; } - private static String ReadLine(Stream ms) + private static String ReadLine(ref BufferedReader reader) { var sb = Pool.StringBuilder.Get(); - while (true) + var count = reader.FreeCapacity; + for (var i = 0; i < count; i++) { - var b = ms.ReadByte(); - if (b < 0) break; - - if (b == '\r') + var b = (Char)reader.ReadByte(); + if (b == '\r' && i + 1 < count) { - var b2 = ms.ReadByte(); - if (b2 < 0) break; + var b2 = (Char)reader.ReadByte(); if (b2 == '\n') break; sb.Append((Char)b); @@ -645,19 +607,17 @@ public class RedisClient : DisposeBase return sb.Return(true); } - private static Int32 ReadLength(Stream ms) + private static Int32 ReadLength(ref BufferedReader reader) { Span span = stackalloc Char[32]; var k = 0; - while (true) + var count = reader.FreeCapacity; + for (var i = 0; i < count; i++) { - var b = ms.ReadByte(); - if (b < 0) break; - - if (b == '\r') + var b = (Char)reader.ReadByte(); + if (b == '\r' && i + 1 < count) { - var b2 = ms.ReadByte(); - if (b2 < 0) break; + var b2 = (Char)reader.ReadByte(); if (b2 == '\n') break; span[k++] = (Char)b; @@ -728,7 +688,12 @@ public class RedisClient : DisposeBase var rs = ExecuteCommand(cmd, args); if (rs == null) return default; if (rs is TResult rs2) return rs2; - if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; + if (TryChangeType(rs, typeof(TResult), out var target)) + { + // 释放内部申请的OwnerPacket + rs.TryDispose(); + return (TResult?)target; + } return default; } @@ -762,6 +727,8 @@ public class RedisClient : DisposeBase if (rs == null) return false; if (TryChangeType(rs, typeof(TResult), out var target)) { + // 释放内部申请的OwnerPacket + rs.TryDispose(); value = (TResult?)target; return true; } @@ -819,7 +786,12 @@ public class RedisClient : DisposeBase var rs = await ExecuteAsync(cmd, args, cancellationToken).ConfigureAwait(false); if (rs == null) return default; if (rs is TResult rs2) return rs2; - if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; + if (TryChangeType(rs, typeof(TResult), out var target)) + { + // 释放内部申请的OwnerPacket + rs.TryDispose(); + return (TResult?)target; + } return default; } @@ -965,7 +937,12 @@ public class RedisClient : DisposeBase for (var i = 0; i < list.Count; i++) { var rs = list[i]; - if (rs != null && TryChangeType(rs, ps[i].Type, out var target) && target != null) list[i] = target; + if (rs != null && TryChangeType(rs, ps[i].Type, out var target) && target != null) + { + // 释放内部申请的OwnerPacket + rs.TryDispose(); + list[i] = target; + } } return list.ToArray(); diff --git a/Samples/Benchmark/BasicBenchmark.cs b/Samples/Benchmark/BasicBenchmark.cs index b84be6c..b8a46a2 100644 --- a/Samples/Benchmark/BasicBenchmark.cs +++ b/Samples/Benchmark/BasicBenchmark.cs @@ -12,6 +12,7 @@ public class BasicBenchmark { public FullRedis Redis { get; set; } + private String _key; private String[] _keys; [GlobalSetup] @@ -26,12 +27,29 @@ public class BasicBenchmark Redis = rds; + _key = Rand.NextString(16); var ks = new String[100_000]; for (var i = 0; i < ks.Length; i++) { ks[i] = Rand.NextString(16); } _keys = ks; + + rds.Set(_key, _key); + var v = rds.Get(_key); + rds.Remove(_key); + } + + [Benchmark] + public void SetTest() + { + var rds = Redis; + var value = Rand.NextString(16); + + for (var i = 0; i < _keys.Length; i++) + { + rds.Set(_keys[i], value); + } } [Benchmark] @@ -46,14 +64,13 @@ public class BasicBenchmark } [Benchmark] - public void SetTest() + public void RemoveTest() { var rds = Redis; - var value = Rand.NextString(16); for (var i = 0; i < _keys.Length; i++) { - rds.Set(_keys[i], value); + rds.Remove(_keys[i]); } } }