[refactor]重构TinyHttpClient的chunk拼接逻辑

This commit is contained in:
石头 2024-10-04 10:35:39 +08:00
parent 54578edf0a
commit 0ea85690cf
2 changed files with 76 additions and 55 deletions

BIN
Doc/chunk分析.eddx Normal file

Binary file not shown.

View File

@ -200,17 +200,24 @@ public class TinyHttpClient : DisposeBase
// 如果没有收完数据包
if (rs != null && res.ContentLength > 0 && rs.Length < res.ContentLength)
{
// 使用内存流拼接需要多次接收的数据包,降低逻辑复杂度
var ms = new MemoryStream(res.ContentLength);
await rs.CopyToAsync(ms);
var total = rs.Length;
var last = rs;
while (total < res.ContentLength)
{
// 这里的IPacket.Append可能有问题因为last不能是结构体
var pk = await SendDataAsync(null, null).ConfigureAwait(false);
last.Append(pk);
if (pk == null || pk.Length == 0) break;
pk.CopyTo(ms);
last = pk;
total += pk.Length;
}
// 从内存流获取缓冲区,打包为数据包返回,避免再次内存分配
ms.Position = 0;
rs = new ArrayPacket(ms);
}
// chunk编码
@ -237,98 +244,112 @@ public class TinyHttpClient : DisposeBase
/// <returns></returns>
protected virtual async Task<IPacket> ReadChunkAsync(IPacket body)
{
var rs = body;
var last = body;
// 使用内存流拼接需要多次接收的数据包,降低逻辑复杂度
var ms = new MemoryStream(BufferSize);
var pk = body;
while (true)
{
// 分析一个片段,如果该片段数据不足,则需要多次读取
var chunk = ParseChunk(pk, out var offset, out var len);
var data = pk.GetSpan();
if (!ParseChunk(data, out var offset, out var len)) break;
// 最后一个片段的长度为0
if (len <= 0) break;
// 更新pk可能还有粘包数据。每一帧数据后面有\r\n
var next = offset + len + 2;
if (next < pk.Length)
pk = pk.Slice(next);
// chunk是否完整
var memory = pk.GetMemory();
if (offset + len <= memory.Length)
{
// 完整数据,截取需要的部分
memory = memory.Slice(offset, len);
ms.Write(memory);
// 更新pk可能还有粘包数据。每一帧数据后面有\r\n
var next = offset + len + 2;
if (next < pk.Length)
pk = pk.Slice(next);
else
{
pk.TryDispose();
pk = null;
}
}
else
{
// 写入片段数据,数据不足
memory = memory[offset..];
ms.Write(memory);
pk.TryDispose();
pk = null;
// 第一个包需要替换,因为偏移量改变
if (last == body)
rs = chunk;
else
last.Append(chunk);
last = chunk;
// 如果该片段数据不足,则需要多次读取
var total = chunk.Length;
while (total < len)
{
var pk2 = await SendDataAsync(null, null).ConfigureAwait(false);
if (pk != null)
pk.Append(pk2);
else
pk = pk2;
// 结尾的间断符号如换行或00。这里有可能一个数据包里面同时返回多个分片
var count = len - total;
if (pk != null && count <= pk.Length)
// 如果该片段数据不足,则需要多次读取
var remain = len - memory.Length;
while (remain > 0)
{
var pk3 = pk.Slice(0, count);
var pk2 = await SendDataAsync(null, null).ConfigureAwait(false);
memory = pk2.GetMemory();
last.Append(pk3);
last = pk3;
total += pk3.Length;
// 结尾的间断符号如换行或00。这里有可能一个数据包里面同时返回多个分片
if (remain <= memory.Length)
{
ms.Write(memory[..remain]);
// 如果还有剩余作为下一个chunk
count += 2;
if (count < pk.Length)
pk = pk.Slice(count);
// 如果还有剩余作为下一个chunk
if (remain + 2 < memory.Length)
pk = pk2.Slice(remain + 2);
else
pk2.TryDispose();
remain = 0;
}
else
pk = null;
{
ms.Write(memory);
remain -= memory.Length;
pk2.TryDispose();
}
}
}
// 还有粘包数据,继续分析
if (pk == null || pk.Length == 0) break;
if (pk.Length > 0) continue;
pk.TryDispose();
if (pk != null && pk.Length > 0) continue;
// 读取新的数据片段,如果不存在则跳出
pk = await SendDataAsync(null, null).ConfigureAwait(false);
if (pk == null || pk.Length == 0) break;
}
return rs;
ms.Position = 0;
return new ArrayPacket(ms);
}
#endregion
#region
private static readonly Byte[] NewLine = [(Byte)'\r', (Byte)'\n'];
private IPacket ParseChunk(IPacket rs, out Int32 offset, out Int32 octets)
private Boolean ParseChunk(Span<Byte> data, out Int32 offset, out Int32 octets)
{
// chunk编码
// 1 ba \r\n xxxx \r\n 0 \r\n\r\n
offset = 0;
octets = 0;
var data = rs.GetSpan();
var p = data.IndexOf(NewLine);
if (p <= 0) return rs;
if (p <= 0) return false;
// 第一段长度
#if NET8_0_OR_GREATER
octets = Int32.Parse(data[..p], NumberStyles.HexNumber);
#else
var str = data[..p].ToStr();
//if (str.Length % 2 != 0) str = "0" + str;
//var len = (Int32)str.ToHex().ToUInt32(0, false);
//Int32.TryParse(str, NumberStyles.HexNumber, null, out var len);
octets = Int32.Parse(str, NumberStyles.HexNumber);
//if (ContentLength < 0) ContentLength = len;
#endif
offset = p + 2;
return rs.Slice(p + 2, octets);
return true;
}
#endregion