[feat]实现缓冲读取器BufferedReader,替代BufferedStream。因为BufferedStream内部从堆上分配8192字节内存数组,每一次Redis请求都需要分配一次,带来了大量内存分配和GC成本。

This commit is contained in:
智能大石头 2025-01-14 01:23:56 +08:00
parent 9c55d4e250
commit db867557be
3 changed files with 243 additions and 103 deletions

View File

@ -0,0 +1,146 @@
using NewLife.Data;
namespace NewLife.Caching.Buffers;
/// <summary>缓存的读取器</summary>
public ref struct BufferedReader
{
#region
private ReadOnlySpan<Byte> _span;
/// <summary>数据片段</summary>
public ReadOnlySpan<Byte> Span => _span;
private Int32 _index;
/// <summary>已读取字节数</summary>
public Int32 Position { get => _index; set => _index = value; }
/// <summary>总容量</summary>
public Int32 Capacity => _span.Length;
/// <summary>空闲容量</summary>
public Int32 FreeCapacity => _span.Length - _index;
private readonly Stream _stream;
private readonly Int32 _bufferSize;
private IPacket _data;
#endregion
#region
/// <summary>实例化Span读取器</summary>
/// <param name="stream"></param>
/// <param name="data"></param>
/// <param name="bufferSize"></param>
public BufferedReader(Stream stream, IPacket data, Int32 bufferSize = 8192)
{
_stream = stream;
_bufferSize = bufferSize;
_data = data;
_span = data.GetSpan();
}
#endregion
#region
/// <summary>告知有多少数据已从缓冲区读取</summary>
/// <param name="count"></param>
public void Advance(Int32 count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (_index + count > _span.Length) throw new ArgumentOutOfRangeException(nameof(count));
_index += count;
}
/// <summary>返回要写入到的Span其大小按 sizeHint 参数指定至少为所请求的大小</summary>
/// <param name="sizeHint"></param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public ReadOnlySpan<Byte> GetSpan(Int32 sizeHint = 0)
{
if (sizeHint > FreeCapacity) throw new ArgumentOutOfRangeException(nameof(sizeHint));
return _span[_index..];
}
#endregion
#region
/// <summary>确保缓冲区中有足够的空间。</summary>
/// <param name="size">需要的字节数。</param>
/// <exception cref="InvalidOperationException"></exception>
public void EnsureSpace(Int32 size)
{
// 检查剩余空间大小不足时再从数据流中读取。此时需要注意创建新的OwnerPacket后需要先把之前剩余的一点数据拷贝过去然后再读取Stream
var remain = FreeCapacity;
if (remain < size)
{
var idx = 0;
var pk = new OwnerPacket(_bufferSize);
if (_data != null && remain > 0)
{
if (!_data.TryGetArray(out var arr)) throw new NotSupportedException();
arr.AsSpan(_index, remain).CopyTo(pk.Buffer);
idx += remain;
}
_data.TryDispose();
_data = pk;
_index = 0;
// 多次读取,直到满足需求
//var n = _stream.ReadExactly(pk.Buffer, pk.Offset + idx, pk.Length - idx);
while (idx < size)
{
var n = _stream.Read(pk.Buffer, pk.Offset + idx, pk.Length - idx);
if (n < 0) break;
idx += n;
}
if (idx < size)
throw new InvalidOperationException("Not enough data to read.");
pk.Resize(idx);
_span = pk.GetSpan();
}
if (_index + size > _span.Length)
throw new InvalidOperationException("Not enough data to read.");
}
/// <summary>读取单个字节</summary>
/// <returns></returns>
public Byte ReadByte()
{
var size = sizeof(Byte);
EnsureSpace(size);
var result = _span[_index];
_index += size;
return result;
}
/// <summary>读取字节数组</summary>
/// <param name="length"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public ReadOnlySpan<Byte> ReadBytes(Int32 length)
{
EnsureSpace(length);
var result = _span.Slice(_index, length);
_index += length;
return result;
}
/// <summary>读取数据包</summary>
/// <param name="length"></param>
/// <returns></returns>
public IPacket ReadPacket(Int32 length)
{
EnsureSpace(length);
var result = _data.Slice(_index, length);
_index += length;
return result;
}
#endregion
}

View File

@ -7,6 +7,7 @@ using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using System.Text; using System.Text;
using NewLife.Buffers; using NewLife.Buffers;
using NewLife.Caching.Buffers;
using NewLife.Collections; using NewLife.Collections;
using NewLife.Data; using NewLife.Data;
using NewLife.Log; using NewLife.Log;
@ -309,24 +310,19 @@ public class RedisClient : DisposeBase
/// <returns></returns> /// <returns></returns>
protected virtual IList<Object?> GetResponse(Stream ns, Int32 count) protected virtual IList<Object?> GetResponse(Stream ns, Int32 count)
{ {
var ms = new BufferedStream(ns); //var ms = new BufferedStream(ns);
var ms = ns;
Char header; using var pk = new OwnerPacket(MAX_POOL_SIZE);
var buf = Pool.Shared.Rent(1);
try
{
// 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取
var n = ms.Read(buf, 0, 1);
if (n <= 0) return [];
header = (Char)buf[0]; var n = ms.Read(pk.Buffer, pk.Offset, pk.Length);
} if (n <= 0) return [];
finally
{
Pool.Shared.Return(buf);
}
return ParseResponse(ms, count, header); pk.Resize(n);
var reader = new BufferedReader(ms, pk, 8192);
return ParseResponse(ms, count, ref reader);
} }
/// <summary>异步接收响应</summary> /// <summary>异步接收响应</summary>
@ -336,27 +332,23 @@ public class RedisClient : DisposeBase
/// <returns></returns> /// <returns></returns>
protected virtual async Task<IList<Object?>> GetResponseAsync(Stream ns, Int32 count, CancellationToken cancellationToken = default) protected virtual async Task<IList<Object?>> GetResponseAsync(Stream ns, Int32 count, CancellationToken cancellationToken = default)
{ {
Char header; var ms = ns;
var buf = Pool.Shared.Rent(1); using var pk = new OwnerPacket(MAX_POOL_SIZE);
try
{
// 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取
if (cancellationToken == CancellationToken.None)
cancellationToken = new CancellationTokenSource(Timeout > 0 ? Timeout : Host.Timeout).Token;
var n = await ns.ReadAsync(buf, 0, 1, cancellationToken).ConfigureAwait(false);
if (n <= 0) return [];
header = (Char)buf[0]; // 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取
} if (cancellationToken == CancellationToken.None)
finally cancellationToken = new CancellationTokenSource(Timeout > 0 ? Timeout : Host.Timeout).Token;
{ var n = await ns.ReadAsync(pk.Buffer, pk.Offset, pk.Length, cancellationToken).ConfigureAwait(false);
Pool.Shared.Return(buf); if (n <= 0) return [];
}
return ParseResponse(ns, count, header); pk.Resize(n);
var reader = new BufferedReader(ms, pk, 8192);
return ParseResponse(ms, count, ref reader);
} }
private IList<Object?> ParseResponse(Stream ms, Int32 count, Char header) private IList<Object?> ParseResponse(Stream ms, Int32 count, ref BufferedReader reader)
{ {
/* /*
* *
@ -370,31 +362,28 @@ public class RedisClient : DisposeBase
var list = new List<Object?>(); var list = new List<Object?>();
var log = Log == null || Log == Logger.Null ? null : Pool.StringBuilder.Get(); var log = Log == null || Log == Logger.Null ? null : Pool.StringBuilder.Get();
//var reader = new SpanReader(pk.GetSpan());
var header = (Char)reader.ReadByte();
// 多行响应 // 多行响应
for (var i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
// 解析响应 // 解析响应
if (i > 0) if (i > 0) header = (Char)reader.ReadByte();
{
var b = ms.ReadByte();
if (b == -1) break;
header = (Char)b;
}
log?.Append(header); log?.Append(header);
if (header == '$') if (header == '$')
{ {
list.Add(ReadBlock(ms, log)); list.Add(ReadBlock(ref reader, log));
} }
else if (header == '*') else if (header == '*')
{ {
list.Add(ReadBlocks(ms, log)); list.Add(ReadBlocks(ref reader, log));
} }
else else
{ {
// 字符串以换行为结束符 // 字符串以换行为结束符
var str = ReadLine(ms); var str = ReadLine(ref reader);
log?.Append(str); log?.Append(str);
if (header is '+' or ':') if (header is '+' or ':')
@ -544,95 +533,68 @@ public class RedisClient : DisposeBase
} }
} }
private static IPacket? ReadBlock(Stream ms, StringBuilder? log) => ReadPacket(ms, log); private static IPacket? ReadBlock(ref BufferedReader reader, StringBuilder? log) => ReadPacket(ref reader, log);
private Object?[] ReadBlocks(Stream ms, StringBuilder? log) private Object?[] ReadBlocks(ref BufferedReader reader, StringBuilder? log)
{ {
// 结果集数量 // 结果集数量
var len = ReadLength(ms); var len = ReadLength(ref reader);
log?.Append(len); log?.Append(len);
if (len < 0) return []; if (len < 0) return [];
var arr = new Object?[len]; var arr = new Object?[len];
for (var i = 0; i < len; i++) for (var i = 0; i < len; i++)
{ {
var b = ms.ReadByte(); var header = (Char)reader.ReadByte();
if (b == -1) break;
var header = (Char)b;
log?.Append(' '); log?.Append(' ');
log?.Append(header); log?.Append(header);
if (header == '$') if (header == '$')
{ {
arr[i] = ReadPacket(ms, log); arr[i] = ReadPacket(ref reader, log);
} }
else if (header is '+' or ':') else if (header is '+' or ':')
{ {
arr[i] = ReadLine(ms); arr[i] = ReadLine(ref reader);
log?.Append(arr[i]); log?.Append(arr[i]);
} }
else if (header == '*') else if (header == '*')
{ {
arr[i] = ReadBlocks(ms, log); arr[i] = ReadBlocks(ref reader, log);
} }
} }
return arr; return arr;
} }
private static IPacket? ReadPacket(Stream ms, StringBuilder? log) private static IPacket? ReadPacket(ref BufferedReader reader, StringBuilder? log)
{ {
var len = ReadLength(ms); var len = ReadLength(ref reader);
log?.Append(len); log?.Append(len);
if (len == 0) if (len == 0)
{ {
// 某些字段即使长度是0还是要把换行符读走 // 某些字段即使长度是0还是要把换行符读走
ReadLine(ms); ReadLine(ref reader);
return null; return null;
} }
if (len <= 0) return null; if (len <= 0) return null;
len += 2; len += 2;
//// 很多时候数据长度为1特殊优化 // 读取数据包,并跳过换行符
//if (len == 3) var pk = reader.ReadPacket(len - 2);
//{ reader.Advance(2);
// var rs = ms.ReadByte(); return pk;
// // 再读取两个换行符网络流不支持Seek
// //ms.Seek(2, SeekOrigin.Current);
// ms.ReadByte();
// ms.ReadByte();
// return rs;
//}
// 从内存池借出包装到MemorySegment中一路向上传递用完后Dispose还到池里
var owner = new OwnerPacket(len);
var span = owner.GetSpan();
var p = 0;
while (p < len)
{
// 等待,直到读完需要的数据,避免大包丢数据
var count = ms.Read(span.Slice(p, len - p));
if (count <= 0) break;
p += count;
}
return owner.Slice(0, p - 2);
} }
private static String ReadLine(Stream ms) private static String ReadLine(ref BufferedReader reader)
{ {
var sb = Pool.StringBuilder.Get(); var sb = Pool.StringBuilder.Get();
while (true) var count = reader.FreeCapacity;
for (var i = 0; i < count; i++)
{ {
var b = ms.ReadByte(); var b = (Char)reader.ReadByte();
if (b < 0) break; if (b == '\r' && i + 1 < count)
if (b == '\r')
{ {
var b2 = ms.ReadByte(); var b2 = (Char)reader.ReadByte();
if (b2 < 0) break;
if (b2 == '\n') break; if (b2 == '\n') break;
sb.Append((Char)b); sb.Append((Char)b);
@ -645,19 +607,17 @@ public class RedisClient : DisposeBase
return sb.Return(true); return sb.Return(true);
} }
private static Int32 ReadLength(Stream ms) private static Int32 ReadLength(ref BufferedReader reader)
{ {
Span<Char> span = stackalloc Char[32]; Span<Char> span = stackalloc Char[32];
var k = 0; var k = 0;
while (true) var count = reader.FreeCapacity;
for (var i = 0; i < count; i++)
{ {
var b = ms.ReadByte(); var b = (Char)reader.ReadByte();
if (b < 0) break; if (b == '\r' && i + 1 < count)
if (b == '\r')
{ {
var b2 = ms.ReadByte(); var b2 = (Char)reader.ReadByte();
if (b2 < 0) break;
if (b2 == '\n') break; if (b2 == '\n') break;
span[k++] = (Char)b; span[k++] = (Char)b;
@ -728,7 +688,12 @@ public class RedisClient : DisposeBase
var rs = ExecuteCommand(cmd, args); var rs = ExecuteCommand(cmd, args);
if (rs == null) return default; if (rs == null) return default;
if (rs is TResult rs2) return rs2; if (rs is TResult rs2) return rs2;
if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; if (TryChangeType(rs, typeof(TResult), out var target))
{
// 释放内部申请的OwnerPacket
rs.TryDispose();
return (TResult?)target;
}
return default; return default;
} }
@ -762,6 +727,8 @@ public class RedisClient : DisposeBase
if (rs == null) return false; if (rs == null) return false;
if (TryChangeType(rs, typeof(TResult), out var target)) if (TryChangeType(rs, typeof(TResult), out var target))
{ {
// 释放内部申请的OwnerPacket
rs.TryDispose();
value = (TResult?)target; value = (TResult?)target;
return true; return true;
} }
@ -819,7 +786,12 @@ public class RedisClient : DisposeBase
var rs = await ExecuteAsync(cmd, args, cancellationToken).ConfigureAwait(false); var rs = await ExecuteAsync(cmd, args, cancellationToken).ConfigureAwait(false);
if (rs == null) return default; if (rs == null) return default;
if (rs is TResult rs2) return rs2; if (rs is TResult rs2) return rs2;
if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; if (TryChangeType(rs, typeof(TResult), out var target))
{
// 释放内部申请的OwnerPacket
rs.TryDispose();
return (TResult?)target;
}
return default; return default;
} }
@ -965,7 +937,12 @@ public class RedisClient : DisposeBase
for (var i = 0; i < list.Count; i++) for (var i = 0; i < list.Count; i++)
{ {
var rs = list[i]; var rs = list[i];
if (rs != null && TryChangeType(rs, ps[i].Type, out var target) && target != null) list[i] = target; if (rs != null && TryChangeType(rs, ps[i].Type, out var target) && target != null)
{
// 释放内部申请的OwnerPacket
rs.TryDispose();
list[i] = target;
}
} }
return list.ToArray(); return list.ToArray();

View File

@ -12,6 +12,7 @@ public class BasicBenchmark
{ {
public FullRedis Redis { get; set; } public FullRedis Redis { get; set; }
private String _key;
private String[] _keys; private String[] _keys;
[GlobalSetup] [GlobalSetup]
@ -26,12 +27,29 @@ public class BasicBenchmark
Redis = rds; Redis = rds;
_key = Rand.NextString(16);
var ks = new String[100_000]; var ks = new String[100_000];
for (var i = 0; i < ks.Length; i++) for (var i = 0; i < ks.Length; i++)
{ {
ks[i] = Rand.NextString(16); ks[i] = Rand.NextString(16);
} }
_keys = ks; _keys = ks;
rds.Set(_key, _key);
var v = rds.Get<String>(_key);
rds.Remove(_key);
}
[Benchmark]
public void SetTest()
{
var rds = Redis;
var value = Rand.NextString(16);
for (var i = 0; i < _keys.Length; i++)
{
rds.Set(_keys[i], value);
}
} }
[Benchmark] [Benchmark]
@ -46,14 +64,13 @@ public class BasicBenchmark
} }
[Benchmark] [Benchmark]
public void SetTest() public void RemoveTest()
{ {
var rds = Redis; var rds = Redis;
var value = Rand.NextString(16);
for (var i = 0; i < _keys.Length; i++) for (var i = 0; i < _keys.Length; i++)
{ {
rds.Set(_keys[i], value); rds.Remove(_keys[i]);
} }
} }
} }